You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/12/29 03:32:43 UTC

[1/2] carbondata git commit: [CARBONDATA-3194] Integrating Carbon with Presto using hive connector

Repository: carbondata
Updated Branches:
  refs/heads/master 7c4e79fca -> e193df0a1


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
index e96cbf7..27462b0 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/PrestoFilterUtil.java
@@ -47,27 +47,21 @@ import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
 import org.apache.carbondata.core.scan.expression.logical.AndExpression;
 import org.apache.carbondata.core.scan.expression.logical.OrExpression;
 
-import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.hive.HiveColumnHandle;
+import com.facebook.presto.hive.HiveType;
 import com.facebook.presto.spi.PrestoException;
 import com.facebook.presto.spi.predicate.Domain;
 import com.facebook.presto.spi.predicate.Range;
 import com.facebook.presto.spi.predicate.TupleDomain;
-import com.facebook.presto.spi.type.BigintType;
-import com.facebook.presto.spi.type.BooleanType;
-import com.facebook.presto.spi.type.DateType;
-import com.facebook.presto.spi.type.DecimalType;
 import com.facebook.presto.spi.type.Decimals;
-import com.facebook.presto.spi.type.DoubleType;
-import com.facebook.presto.spi.type.IntegerType;
-import com.facebook.presto.spi.type.SmallintType;
-import com.facebook.presto.spi.type.TimestampType;
 import com.facebook.presto.spi.type.Type;
-import com.facebook.presto.spi.type.VarcharType;
 import io.airlift.slice.Slice;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 
 import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
 import static com.google.common.base.Preconditions.checkArgument;
 
+
 /**
  * PrestoFilterUtil create the carbonData Expression from the presto-domain
  */
@@ -78,31 +72,30 @@ public class PrestoFilterUtil {
   private static final String HIVE_DEFAULT_DYNAMIC_PARTITION = "__HIVE_DEFAULT_PARTITION__";
 
   /**
-   * @param carbondataColumnHandle
+   * @param columnHandle
    * @return
    */
-  private static DataType spi2CarbondataTypeMapper(CarbondataColumnHandle carbondataColumnHandle) {
-    Type colType = carbondataColumnHandle.getColumnType();
-    if (colType == BooleanType.BOOLEAN) {
+  private static DataType spi2CarbondataTypeMapper(HiveColumnHandle columnHandle) {
+    HiveType colType = columnHandle.getHiveType();
+    if (colType.equals(HiveType.HIVE_BOOLEAN)) {
       return DataTypes.BOOLEAN;
-    } else if (colType == SmallintType.SMALLINT) {
+    } else if (colType.equals(HiveType.HIVE_SHORT)) {
       return DataTypes.SHORT;
-    } else if (colType == IntegerType.INTEGER) {
+    } else if (colType.equals(HiveType.HIVE_INT)) {
       return DataTypes.INT;
-    } else if (colType == BigintType.BIGINT) {
+    } else if (colType.equals(HiveType.HIVE_LONG)) {
       return DataTypes.LONG;
-    } else if (colType == DoubleType.DOUBLE) {
+    } else if (colType.equals(HiveType.HIVE_DOUBLE)) {
       return DataTypes.DOUBLE;
-    } else if (colType == VarcharType.VARCHAR) {
+    } else if (colType.equals(HiveType.HIVE_STRING)) {
       return DataTypes.STRING;
-    } else if (colType == DateType.DATE) {
+    } else if (colType.equals(HiveType.HIVE_DATE)) {
       return DataTypes.DATE;
-    } else if (colType == TimestampType.TIMESTAMP) {
+    } else if (colType.equals(HiveType.HIVE_TIMESTAMP)) {
       return DataTypes.TIMESTAMP;
-    } else if (colType.equals(DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(),
-        carbondataColumnHandle.getScale()))) {
-      return DataTypes.createDecimalType(carbondataColumnHandle.getPrecision(),
-          carbondataColumnHandle.getScale());
+    } else if (colType.getTypeInfo() instanceof DecimalTypeInfo) {
+      DecimalTypeInfo typeInfo = (DecimalTypeInfo) colType.getTypeInfo();
+      return DataTypes.createDecimalType(typeInfo.getPrecision(),typeInfo.getScale());
     } else {
       return DataTypes.STRING;
     }
@@ -115,16 +108,15 @@ public class PrestoFilterUtil {
    * @return
    */
   public static List<String> getPartitionFilters(CarbonTable carbonTable,
-      TupleDomain<ColumnHandle> originalConstraint) {
+      TupleDomain<HiveColumnHandle> originalConstraint) {
     List<ColumnSchema> columnSchemas = carbonTable.getPartitionInfo().getColumnSchemaList();
     List<String> filter = new ArrayList<>();
-    for (ColumnHandle columnHandle : originalConstraint.getDomains().get().keySet()) {
-      CarbondataColumnHandle carbondataColumnHandle = (CarbondataColumnHandle) columnHandle;
+    for (HiveColumnHandle columnHandle : originalConstraint.getDomains().get().keySet()) {
       List<ColumnSchema> partitionedColumnSchema = columnSchemas.stream().filter(
-          columnSchema -> carbondataColumnHandle.getColumnName()
+          columnSchema -> columnHandle.getName()
               .equals(columnSchema.getColumnName())).collect(toList());
       if (partitionedColumnSchema.size() != 0) {
-        filter.addAll(createPartitionFilters(originalConstraint, carbondataColumnHandle));
+        filter.addAll(createPartitionFilters(originalConstraint, columnHandle));
       }
     }
     return filter;
@@ -132,46 +124,49 @@ public class PrestoFilterUtil {
 
   /** Returns list of partition key and values using domain constraints
    * @param originalConstraint
-   * @param carbonDataColumnHandle
+   * @param columnHandle
    */
-  private static List<String> createPartitionFilters(TupleDomain<ColumnHandle> originalConstraint,
-      CarbondataColumnHandle carbonDataColumnHandle) {
+  private static List<String> createPartitionFilters(
+      TupleDomain<HiveColumnHandle> originalConstraint, HiveColumnHandle columnHandle) {
     List<String> filter = new ArrayList<>();
-    Domain domain = originalConstraint.getDomains().get().get(carbonDataColumnHandle);
+    if (!originalConstraint.getDomains().isPresent()) {
+      return filter;
+    }
+    Domain domain = originalConstraint.getDomains().get().get(columnHandle);
     if (domain != null && domain.isNullableSingleValue()) {
       Object value = domain.getNullableSingleValue();
       Type type = domain.getType();
       if (value == null) {
-        filter.add(carbonDataColumnHandle.getColumnName() + "=" + HIVE_DEFAULT_DYNAMIC_PARTITION);
-      } else if (carbonDataColumnHandle.getColumnType() instanceof DecimalType) {
-        int scale = ((DecimalType) carbonDataColumnHandle.getColumnType()).getScale();
+        filter.add(columnHandle.getName() + "=" + HIVE_DEFAULT_DYNAMIC_PARTITION);
+      } else if (columnHandle.getHiveType().getTypeInfo() instanceof DecimalTypeInfo) {
+        int scale = ((DecimalTypeInfo) columnHandle.getHiveType().getTypeInfo()).getScale();
         if (value instanceof Long) {
           //create decimal value from Long
           BigDecimal decimalValue = new BigDecimal(new BigInteger(String.valueOf(value)), scale);
-          filter.add(carbonDataColumnHandle.getColumnName() + "=" + decimalValue.toString());
+          filter.add(columnHandle.getName() + "=" + decimalValue.toString());
         } else if (value instanceof Slice) {
           //create decimal value from Slice
           BigDecimal decimalValue =
               new BigDecimal(Decimals.decodeUnscaledValue((Slice) value), scale);
-          filter.add(carbonDataColumnHandle.getColumnName() + "=" + decimalValue.toString());
+          filter.add(columnHandle.getName() + "=" + decimalValue.toString());
         }
       } else if (value instanceof Slice) {
-        filter.add(carbonDataColumnHandle.getColumnName() + "=" + ((Slice) value).toStringUtf8());
-      } else if (value instanceof Long && carbonDataColumnHandle.getColumnType()
-          .equals(DateType.DATE)) {
+        filter.add(columnHandle.getName() + "=" + ((Slice) value).toStringUtf8());
+      } else if (value instanceof Long && columnHandle.getHiveType()
+          .equals(HiveType.HIVE_DATE)) {
         Calendar c = Calendar.getInstance();
         c.setTime(new java.sql.Date(0));
         c.add(Calendar.DAY_OF_YEAR, ((Long) value).intValue());
         java.sql.Date date = new java.sql.Date(c.getTime().getTime());
-        filter.add(carbonDataColumnHandle.getColumnName() + "=" + date.toString());
-      } else if (value instanceof Long && carbonDataColumnHandle.getColumnType()
-          .equals(TimestampType.TIMESTAMP)) {
+        filter.add(columnHandle.getName() + "=" + date.toString());
+      } else if (value instanceof Long && columnHandle.getHiveType()
+          .equals(HiveType.HIVE_TIMESTAMP)) {
         String timeStamp = new Timestamp((Long) value).toString();
-        filter.add(carbonDataColumnHandle.getColumnName() + "=" + timeStamp
+        filter.add(columnHandle.getName() + "=" + timeStamp
             .substring(0, timeStamp.indexOf('.')));
       } else if ((value instanceof Boolean) || (value instanceof Double)
           || (value instanceof Long)) {
-        filter.add(carbonDataColumnHandle.getColumnName() + "=" + value.toString());
+        filter.add(columnHandle.getName() + "=" + value.toString());
       } else {
         throw new PrestoException(NOT_SUPPORTED,
             format("Unsupported partition key type: %s", type.getDisplayName()));
@@ -186,23 +181,26 @@ public class PrestoFilterUtil {
    * @param originalConstraint presto-TupleDomain
    * @return
    */
-  static Expression parseFilterExpression(TupleDomain<ColumnHandle> originalConstraint) {
+  static Expression parseFilterExpression(TupleDomain<HiveColumnHandle> originalConstraint) {
 
     Domain domain;
 
+    if (originalConstraint.isNone()) {
+      return null;
+    }
+
     // final expression for the table,
     // returned by the method after combining all the column filters (colValueExpression).
     Expression finalFilters = null;
 
-    for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+    for (HiveColumnHandle cdch : originalConstraint.getDomains().get().keySet()) {
 
       // Build ColumnExpression for Expression(Carbondata)
-      CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
-      Type type = cdch.getColumnType();
+      HiveType type = cdch.getHiveType();
       DataType coltype = spi2CarbondataTypeMapper(cdch);
-      Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
+      Expression colExpression = new ColumnExpression(cdch.getName(), coltype);
 
-      domain = originalConstraint.getDomains().get().get(c);
+      domain = originalConstraint.getDomains().get().get(cdch);
       checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
       List<Object> singleValues = new ArrayList<>();
 
@@ -282,36 +280,38 @@ public class PrestoFilterUtil {
     return finalFilters;
   }
 
-  private static Object convertDataByType(Object rawdata, Type type) {
-    if (type.equals(IntegerType.INTEGER) || type.equals(SmallintType.SMALLINT)) {
+  private static Object convertDataByType(Object rawdata, HiveType type) {
+    if (type.equals(HiveType.HIVE_INT) || type.equals(HiveType.HIVE_SHORT)) {
       return Integer.valueOf(rawdata.toString());
-    } else if (type.equals(BigintType.BIGINT)) {
+    } else if (type.equals(HiveType.HIVE_LONG)) {
       return rawdata;
-    } else if (type.equals(VarcharType.VARCHAR)) {
+    } else if (type.equals(HiveType.HIVE_STRING)) {
       if (rawdata instanceof Slice) {
         return ((Slice) rawdata).toStringUtf8();
       } else {
         return rawdata;
       }
-    } else if (type.equals(BooleanType.BOOLEAN)) {
+    } else if (type.equals(HiveType.HIVE_BOOLEAN)) {
       return rawdata;
-    } else if (type.equals(DateType.DATE)) {
+    } else if (type.equals(HiveType.HIVE_DATE)) {
       Calendar c = Calendar.getInstance();
       c.setTime(new Date(0));
       c.add(Calendar.DAY_OF_YEAR, ((Long) rawdata).intValue());
       Date date = c.getTime();
       return date.getTime() * 1000;
-    } else if (type instanceof DecimalType) {
+    }
+    else if (type.getTypeInfo() instanceof DecimalTypeInfo) {
       if (rawdata instanceof Double) {
         return new BigDecimal((Double) rawdata);
       } else if (rawdata instanceof Long) {
         return new BigDecimal(new BigInteger(String.valueOf(rawdata)),
-            ((DecimalType) type).getScale());
+            ((DecimalTypeInfo) type.getTypeInfo()).getScale());
       } else if (rawdata instanceof Slice) {
         return new BigDecimal(Decimals.decodeUnscaledValue((Slice) rawdata),
-            ((DecimalType) type).getScale());
+            ((DecimalTypeInfo) type.getTypeInfo()).getScale());
       }
-    } else if (type.equals(TimestampType.TIMESTAMP)) {
+    }
+    else if (type.equals(HiveType.HIVE_TIMESTAMP)) {
       return (Long) rawdata * 1000;
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java
index 37174c1..fd232ed 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalMultiBlockSplit.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.gson.Gson;
 
 /**
  * CarbonLocalInputSplit represents a block, it contains a set of blocklet.
@@ -71,8 +72,15 @@ public class CarbonLocalMultiBlockSplit {
     this.locations = locations;
   }
 
-  public static CarbonMultiBlockSplit convertSplit(
-      CarbonLocalMultiBlockSplit carbonLocalMultiBlockSplit) {
+  public String getJsonString() {
+    Gson gson = new Gson();
+    return gson.toJson(this);
+  }
+
+  public static CarbonMultiBlockSplit convertSplit(String multiSplitJson) {
+    Gson gson = new Gson();
+    CarbonLocalMultiBlockSplit carbonLocalMultiBlockSplit =
+        gson.fromJson(multiSplitJson, CarbonLocalMultiBlockSplit.class);
     List<CarbonInputSplit> carbonInputSplitList =
         carbonLocalMultiBlockSplit.getSplitList().stream().map(CarbonLocalInputSplit::convertSplit)
             .collect(Collectors.toList());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
index ab1c871..7474da8 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
@@ -17,8 +17,6 @@
 
 package org.apache.carbondata.presto.impl;
 
-import javax.validation.constraints.NotNull;
-
 import io.airlift.configuration.Config;
 
 /**
@@ -27,9 +25,6 @@ import io.airlift.configuration.Config;
 public class CarbonTableConfig {
 
   //read from config
-  private String dbPath;
-  private String tablePath;
-  private String storePath;
   private String unsafeMemoryInMb;
   private String enableUnsafeInQueryExecution;
   private String enableUnsafeColumnPage;
@@ -46,36 +41,6 @@ public class CarbonTableConfig {
   private String pushRowFilter;
 
 
-  @NotNull public String getDbPath() {
-    return dbPath;
-  }
-
-  @Config("carbondata-store")
-  public CarbonTableConfig setDbPath(String dbPath) {
-    this.dbPath = dbPath;
-    return this;
-  }
-
-  @NotNull public String getTablePath() {
-    return tablePath;
-  }
-
-  @Config("carbondata-store")
-  public CarbonTableConfig setTablePath(String tablePath) {
-    this.tablePath = tablePath;
-    return this;
-  }
-
-  @NotNull public String getStorePath() {
-    return storePath;
-  }
-
-  @Config("carbondata-store")
-  public CarbonTableConfig setStorePath(String storePath) {
-    this.storePath = storePath;
-    return this;
-  }
-
   public String getUnsafeMemoryInMb() {
     return unsafeMemoryInMb;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 363f3f5..7ad6568 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -27,20 +27,16 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
@@ -63,13 +59,9 @@ import org.apache.carbondata.presto.PrestoFilterUtil;
 import com.facebook.presto.hadoop.$internal.com.google.gson.Gson;
 import com.facebook.presto.hadoop.$internal.io.netty.util.internal.ConcurrentSet;
 import com.facebook.presto.hadoop.$internal.org.apache.commons.collections.CollectionUtils;
-import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.hive.HiveColumnHandle;
 import com.facebook.presto.spi.SchemaTableName;
-import com.facebook.presto.spi.TableNotFoundException;
-import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
 import com.facebook.presto.spi.predicate.TupleDomain;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.inject.Inject;
 import org.apache.commons.lang.time.DateUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -109,19 +101,11 @@ public class CarbonTableReader {
    */
   private ConcurrentSet<SchemaTableName> tableList;
   /**
-   * carbonFileList represents the store path of the schema, which is configured as carbondata-store
-   * in the CarbonData catalog file ($PRESTO_HOME$/etc/catalog/carbondata.properties).
-   */
-  private CarbonFile carbonFileList;
-  private FileFactory.FileType fileType;
-  /**
    * A cache for Carbon reader, with this cache,
    * metadata of a table is only read from file system once.
    */
   private AtomicReference<HashMap<SchemaTableName, CarbonTableCacheModel>> carbonCache;
 
-  private LoadMetadataDetails[] loadMetadataDetails;
-
   private String queryId;
 
   /**
@@ -139,7 +123,6 @@ public class CarbonTableReader {
     this.config = Objects.requireNonNull(config, "CarbonTableConfig is null");
     this.carbonCache = new AtomicReference(new HashMap());
     tableList = new ConcurrentSet<>();
-    setS3Properties();
     populateCarbonProperties();
   }
 
@@ -149,23 +132,11 @@ public class CarbonTableReader {
    * @param table the name of the table and schema.
    * @return
    */
-  public CarbonTableCacheModel getCarbonCache(SchemaTableName table) {
-
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table, String location,
+      Configuration config) {
     if (!carbonCache.get().containsKey(table) || carbonCache.get().get(table) == null) {
-      // if this table is not cached, try to read the metadata of the table and cache it.
-      try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
-          FileFactory.class.getClassLoader())) {
-        if (carbonFileList == null) {
-          fileType = FileFactory.getFileType(config.getStorePath());
-          try {
-            carbonFileList = FileFactory.getCarbonFile(config.getStorePath(), fileType);
-          } catch (Exception ex) {
-            throw new RuntimeException(ex);
-          }
-        }
-      }
-      updateSchemaTables(table);
-      parseCarbonMetadata(table);
+      updateSchemaTables(table, config);
+      parseCarbonMetadata(table, location, config);
     }
     if (carbonCache.get().containsKey(table)) {
       return carbonCache.get().get(table);
@@ -183,116 +154,22 @@ public class CarbonTableReader {
   }
 
   /**
-   * Return the schema names under a schema store path (this.carbonFileList).
-   *
-   * @return
-   */
-  public List<String> getSchemaNames() {
-    return updateSchemaList();
-  }
-
-  /**
-   * Get the CarbonFile instance which represents the store path in the configuration,
-   * and assign it to this.carbonFileList.
-   *
-   * @return
-   */
-  private boolean updateCarbonFile() {
-    if (carbonFileList == null) {
-      fileType = FileFactory.getFileType(config.getStorePath());
-      try {
-        carbonFileList = FileFactory.getCarbonFile(config.getStorePath(), fileType);
-      } catch (Exception ex) {
-        throw new RuntimeException(ex);
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Return the schema names under a schema store path (this.carbonFileList).
-   *
-   * @return
-   */
-  private List<String> updateSchemaList() {
-    updateCarbonFile();
-    if (carbonFileList != null) {
-      Stream.of(carbonFileList.listFiles()).forEach(this::getName);
-      return schemaNames;
-    } else return ImmutableList.of();
-  }
-
-  private void getName(CarbonFile carbonFile) {
-    if (!carbonFile.getName().equalsIgnoreCase("_system") && !carbonFile.getName()
-        .equalsIgnoreCase(".ds_store")) {
-      schemaNames.add(carbonFile.getName());
-    }
-  }
-
-  /**
-   * Get the names of the tables in the given schema.
-   *
-   * @param schema name of the schema
-   * @return
-   */
-  public Set<String> getTableNames(String schema) {
-    Objects.requireNonNull(schema, "schema is null");
-    return updateTableList(schema);
-  }
-
-  /**
-   * Get the names of the tables in the given schema.
-   *
-   * @param schemaName name of the schema
-   * @return
-   */
-  private Set<String> updateTableList(String schemaName) {
-    updateCarbonFile();
-    List<CarbonFile> schema =
-        Stream.of(carbonFileList.listFiles()).filter(a -> schemaName.equals(a.getName()))
-            .collect(Collectors.toList());
-    if (schema.size() > 0) {
-      return Stream.of((schema.get(0)).listFiles()).map(CarbonFile::getName)
-          .collect(Collectors.toSet());
-    } else return ImmutableSet.of();
-  }
-
-  /**
-   * Get the CarbonTable instance of the given table.
-   *
-   * @param schemaTableName name of the given table.
-   * @return
-   */
-  public CarbonTable getTable(SchemaTableName schemaTableName) {
-    try {
-      updateSchemaTables(schemaTableName);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    Objects.requireNonNull(schemaTableName, "schemaTableName is null");
-    return loadTableMetadata(schemaTableName);
-  }
-
-  /**
    * Find all the tables under the schema store path (this.carbonFileList)
    * and cache all the table names in this.tableList. Notice that whenever this method
    * is called, it clears this.tableList and populate the list by reading the files.
    */
-  private void updateSchemaTables(SchemaTableName schemaTableName) {
+  private void updateSchemaTables(SchemaTableName schemaTableName, Configuration config) {
     // update logic determine later
     boolean isKeyExists = carbonCache.get().containsKey(schemaTableName);
 
-    if (carbonFileList == null) {
-      updateSchemaList();
-    }
     if (isKeyExists) {
       CarbonTableCacheModel carbonTableCacheModel = carbonCache.get().get(schemaTableName);
       if (carbonTableCacheModel != null && carbonTableCacheModel.carbonTable.getTableInfo() != null
           && carbonTableCacheModel.carbonTable.isTransactionalTable()) {
         Long latestTime = FileFactory.getCarbonFile(CarbonTablePath
-            .getSchemaFilePath(carbonCache.get().get(schemaTableName).carbonTable.getTablePath()))
-            .getLastModifiedTime();
+                .getSchemaFilePath(
+                    carbonCache.get().get(schemaTableName).carbonTable.getTablePath()),
+            config).getLastModifiedTime();
         Long oldTime = carbonTableCacheModel.carbonTable.getTableInfo().getLastUpdatedTime();
         if (DateUtils.truncate(new Date(latestTime), Calendar.MINUTE)
             .after(DateUtils.truncate(new Date(oldTime), Calendar.MINUTE))) {
@@ -300,31 +177,6 @@ public class CarbonTableReader {
         }
       }
     }
-    if (!tableList.contains(schemaTableName)) {
-      for (CarbonFile cf : carbonFileList.listFiles()) {
-        if (!cf.getName().endsWith(".mdt")) {
-          for (CarbonFile table : cf.listFiles()) {
-            tableList.add(new SchemaTableName(cf.getName(), table.getName()));
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Find the table with the given name and build a CarbonTable instance for it.
-   * This method should be called after this.updateSchemaTables().
-   *
-   * @param schemaTableName name of the given table.
-   * @return
-   */
-  private CarbonTable loadTableMetadata(SchemaTableName schemaTableName) {
-    for (SchemaTableName table : tableList) {
-      if (!table.equals(schemaTableName)) continue;
-
-      return parseCarbonMetadata(table);
-    }
-    throw new TableNotFoundException(schemaTableName);
   }
 
   /**
@@ -334,7 +186,8 @@ public class CarbonTableReader {
    * @param table name of the given table.
    * @return the CarbonTable instance which contains all the needed metadata for a table.
    */
-  private CarbonTable parseCarbonMetadata(SchemaTableName table) {
+  private CarbonTable parseCarbonMetadata(SchemaTableName table, String tablePath,
+      Configuration config) {
     CarbonTable result;
     try {
       CarbonTableCacheModel cache = carbonCache.get().get(table);
@@ -347,25 +200,9 @@ public class CarbonTableReader {
       // If table is not previously cached, then:
 
       // Step 1: get store path of the table and cache it.
-      // create table identifier. the table id is randomly generated.
-      CarbonTableIdentifier carbonTableIdentifier =
-          new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(),
-              UUID.randomUUID().toString());
-      String storePath = config.getStorePath();
-      String tablePath = storePath + "/" + carbonTableIdentifier.getDatabaseName() + "/"
-          + carbonTableIdentifier.getTableName();
-
       String metadataPath = CarbonTablePath.getSchemaFilePath(tablePath);
-      boolean isTransactionalTable = false;
-      try {
-        FileFactory.FileType fileType = FileFactory.getFileType(metadataPath);
-        if (FileFactory.getCarbonFile(metadataPath, fileType).isFileExist(metadataPath, fileType)) {
-          // If metadata folder exists, it is a transactional table
-          isTransactionalTable = true;
-        }
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
+      // If metadata folder exists, it is a transactional table
+      boolean isTransactionalTable = FileFactory.getCarbonFile(metadataPath, config).exists();
       org.apache.carbondata.format.TableInfo tableInfo;
       if (isTransactionalTable) {
         //Step 2: read the metadata (tableInfo) of the table.
@@ -379,13 +216,13 @@ public class CarbonTableReader {
           }
         };
         ThriftReader thriftReader =
-            new ThriftReader(CarbonTablePath.getSchemaFilePath(tablePath), createTBase);
+            new ThriftReader(CarbonTablePath.getSchemaFilePath(tablePath), createTBase, config);
         thriftReader.open();
         tableInfo = (org.apache.carbondata.format.TableInfo) thriftReader.read();
         thriftReader.close();
       } else {
         tableInfo = CarbonUtil
-            .inferSchema(tablePath, table.getTableName(), false, FileFactory.getConfiguration());
+            .inferSchema(tablePath, table.getTableName(), false, config);
       }
       // Step 3: convert format level TableInfo to code level TableInfo
       SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
@@ -400,8 +237,8 @@ public class CarbonTableReader {
       // Step 4: Load metadata info into CarbonMetadata
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
 
-      cache.carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-          table.getSchemaName(), table.getTableName());
+      cache.carbonTable =
+          CarbonMetadata.getInstance().getCarbonTable(table.getSchemaName(), table.getTableName());
 
       // cache the table
       carbonCache.get().put(table, cache);
@@ -415,12 +252,12 @@ public class CarbonTableReader {
   }
 
   public List<CarbonLocalMultiBlockSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
-      Expression filters, TupleDomain<ColumnHandle> constraints) throws IOException {
+      Expression filters, TupleDomain<HiveColumnHandle> constraints, Configuration config)
+      throws IOException {
     List<CarbonLocalInputSplit> result = new ArrayList<>();
     List<CarbonLocalMultiBlockSplit> multiBlockSplitList = new ArrayList<>();
     CarbonTable carbonTable = tableCacheModel.carbonTable;
     TableInfo tableInfo = tableCacheModel.carbonTable.getTableInfo();
-    Configuration config = new Configuration();
     config.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
     String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
     config.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
@@ -434,7 +271,7 @@ public class CarbonTableReader {
     List<PartitionSpec> filteredPartitions = new ArrayList();
 
     PartitionInfo partitionInfo = carbonTable.getPartitionInfo(carbonTable.getTableName());
-
+    LoadMetadataDetails[] loadMetadataDetails = null;
     if (partitionInfo != null && partitionInfo.getPartitionType() == PartitionType.NATIVE_HIVE) {
       try {
         loadMetadataDetails = SegmentStatusManager.readTableStatusFile(
@@ -493,7 +330,7 @@ public class CarbonTableReader {
    * @param carbonTable
    * @throws IOException
    */
-  private List<PartitionSpec> findRequiredPartitions(TupleDomain<ColumnHandle> constraints,
+  private List<PartitionSpec> findRequiredPartitions(TupleDomain<HiveColumnHandle> constraints,
       CarbonTable carbonTable, LoadMetadataDetails[] loadMetadataDetails) throws IOException {
     Set<PartitionSpec> partitionSpecs = new HashSet<>();
     List<PartitionSpec> prunePartitions = new ArrayList();
@@ -548,18 +385,19 @@ public class CarbonTableReader {
     addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME, "Presto_Server");
   }
 
-  private void setS3Properties() {
-    FileFactory.getConfiguration().set(ACCESS_KEY, Objects.toString(config.getS3A_AcesssKey(), ""));
-    FileFactory.getConfiguration().set(SECRET_KEY, Objects.toString(config.getS3A_SecretKey()));
-    FileFactory.getConfiguration()
+  public Configuration updateS3Properties(Configuration configuration) {
+    configuration.set(ACCESS_KEY, Objects.toString(config.getS3A_AcesssKey(), ""));
+    configuration.set(SECRET_KEY, Objects.toString(config.getS3A_SecretKey()));
+    configuration
         .set(CarbonCommonConstants.S3_ACCESS_KEY, Objects.toString(config.getS3_AcesssKey(), ""));
-    FileFactory.getConfiguration()
+    configuration
         .set(CarbonCommonConstants.S3_SECRET_KEY, Objects.toString(config.getS3_SecretKey()));
-    FileFactory.getConfiguration()
+    configuration
         .set(CarbonCommonConstants.S3N_ACCESS_KEY, Objects.toString(config.getS3N_AcesssKey(), ""));
-    FileFactory.getConfiguration()
+    configuration
         .set(CarbonCommonConstants.S3N_SECRET_KEY, Objects.toString(config.getS3N_SecretKey(), ""));
-    FileFactory.getConfiguration().set(ENDPOINT, Objects.toString(config.getS3EndPoint(), ""));
+    configuration.set(ENDPOINT, Objects.toString(config.getS3EndPoint(), ""));
+    return configuration;
   }
 
   private void addProperty(String propertyName, String propertyValue) {
@@ -576,10 +414,6 @@ public class CarbonTableReader {
     return cis.getLocations().toArray(new String[cis.getLocations().size()]);
   }
 
-  public String getQueryId() {
-    return queryId;
-  }
-
   public void setQueryId(String queryId) {
     this.queryId = queryId;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeLocalDictTest.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeLocalDictTest.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeLocalDictTest.scala
index 3f4d839..4360977 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeLocalDictTest.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeLocalDictTest.scala
@@ -18,7 +18,7 @@
 package org.apache.carbondata.presto.integrationtest
 
 import java.io.File
-import java.sql.Timestamp
+import java.util
 
 import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
 import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
@@ -27,7 +27,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.presto.server.PrestoServer
 
 
@@ -75,16 +75,25 @@ class PrestoAllDataTypeLocalDictTest extends FunSuiteLike with BeforeAndAfterAll
       systemPath)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
       "Presto")
+    val map = new util.HashMap[String, String]()
+    map.put("hive.metastore", "file")
+    map.put("hive.metastore.catalog.dir", s"file://$storePath")
+
+    prestoServer.startServer(storePath, "testdb", map)
+    prestoServer.execute("drop table if exists testdb.testtable")
+    prestoServer.execute("drop schema if exists testdb")
+    prestoServer.execute("create schema testdb")
+    prestoServer.execute("create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary double, bonus decimal(10,4), monthlyBonus decimal(18,4), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='CARBON') ")
     CarbonDataStoreCreator
       .createCarbonStore(storePath,
         s"$rootPath/integration/presto/src/test/resources/alldatatype.csv", true)
     logger.info(s"\nCarbon store is created at location: $storePath")
     cleanUp
-    prestoServer.startServer(storePath)
   }
 
   override def afterAll(): Unit = {
     prestoServer.stopServer()
+    CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(storePath))
   }
 
   test("select string type with order by clause") {
@@ -213,6 +222,7 @@ class PrestoAllDataTypeLocalDictTest extends FunSuiteLike with BeforeAndAfterAll
         "SELECT ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY,BONUS FROM TESTDB.TESTTABLE " +
         "WHERE BONUS>1234 AND ID<2 GROUP BY ID,DATE,COUNTRY,NAME,PHONETYPE,SERIALNAME,SALARY," +
         "BONUS ORDER BY ID")
+    actualResult.foreach(println)
     val expectedResult: List[Map[String, Any]] = List(Map("ID" -> 1,
       "NAME" -> "anubhav",
       "BONUS" -> java.math.BigDecimal.valueOf(1234.4440).setScale(4),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
index 7bab795..17490e4 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoAllDataTypeTest.scala
@@ -19,6 +19,7 @@ package org.apache.carbondata.presto.integrationtest
 
 import java.io.File
 import java.sql.Timestamp
+import java.util
 
 import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
 import org.scalatest.{BeforeAndAfterAll, FunSuiteLike}
@@ -27,7 +28,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory.FileType
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.presto.server.PrestoServer
 
 
@@ -75,16 +76,25 @@ class PrestoAllDataTypeTest extends FunSuiteLike with BeforeAndAfterAll {
       systemPath)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
       "Presto")
+    val map = new util.HashMap[String, String]()
+    map.put("hive.metastore", "file")
+    map.put("hive.metastore.catalog.dir", s"file://$storePath")
+
+    prestoServer.startServer(storePath, "testdb", map)
+    prestoServer.execute("drop table if exists testdb.testtable")
+    prestoServer.execute("drop schema if exists testdb")
+    prestoServer.execute("create schema testdb")
+    prestoServer.execute("create table testdb.testtable(ID int, date date, country varchar, name varchar, phonetype varchar, serialname varchar,salary double, bonus decimal(10,4), monthlyBonus decimal(18,4), dob timestamp, shortField smallint, iscurrentemployee boolean) with(format='CARBON') ")
     CarbonDataStoreCreator
       .createCarbonStore(storePath,
         s"$rootPath/integration/presto/src/test/resources/alldatatype.csv")
     logger.info(s"\nCarbon store is created at location: $storePath")
     cleanUp
-    prestoServer.startServer(storePath, "testdb")
   }
 
   override def afterAll(): Unit = {
     prestoServer.stopServer()
+    CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(storePath))
   }
 
   test("test the result for count(*) in presto") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
index 4c2f51f..65ec8bd 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoTestNonTransactionalTableFiles.scala
@@ -51,23 +51,41 @@ class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAf
       systemPath)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_WRITTEN_BY_APPNAME,
       "Presto")
-    prestoServer.startServer(storePath, "sdk_output")
+    val map = new util.HashMap[String, String]()
+    map.put("hive.metastore", "file")
+    map.put("hive.metastore.catalog.dir", s"file://$storePath")
+
+    prestoServer.startServer(storePath, "sdk_output", map)
   }
 
   override def afterAll(): Unit = {
     prestoServer.stopServer()
+    CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(storePath))
   }
 
   def buildTestDataSingleFile(): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
+    createTable
+
     buildTestData(3, null)
   }
 
   def buildTestDataMultipleFiles(): Any = {
     FileUtils.deleteDirectory(new File(writerPath))
+    createTable
     buildTestData(1000000, null)
   }
 
+  private def createTable = {
+    prestoServer.execute("drop table if exists sdk_output.files")
+    prestoServer.execute("drop schema if exists sdk_output")
+    prestoServer.execute("create schema sdk_output")
+    prestoServer
+      .execute(
+        "create table sdk_output.files(name varchar, age int, height double) with" +
+        "(format='CARBON') ")
+  }
+
   def buildTestData(rows: Int, options: util.Map[String, String]): Any = {
     buildTestData(rows, options, List("name"))
   }
@@ -173,8 +191,7 @@ class PrestoTestNonTransactionalTableFiles extends FunSuiteLike with BeforeAndAf
       .executeQuery("show schemas ")
     assert(actualResult
       .equals(List(Map("Schema" -> "information_schema"),
-        Map("Schema" -> "sdk_output"),
-        Map("Schema" -> "testdb"))))
+        Map("Schema" -> "sdk_output"))))
     cleanTestData()
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
index 5c0c40a..34c5c0a 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/server/PrestoServer.scala
@@ -39,10 +39,11 @@ class PrestoServer {
   val CARBONDATA_CATALOG = "carbondata"
   val CARBONDATA_CONNECTOR = "carbondata"
   val CARBONDATA_SOURCE = "carbondata"
-  val logger: Logger = LoggerFactory.getLogger(this.getClass)
+  val LOGGER: Logger = LoggerFactory.getLogger(this.getClass)
 
 
   val prestoProperties: util.Map[String, String] = Map(("http-server.http.port", "8086")).asJava
+  val carbonProperties: util.Map[String, String] = new util.HashMap[String, String]()
   createSession
   lazy val queryRunner = new DistributedQueryRunner(createSession, 4, prestoProperties)
   var dbName : String = null
@@ -55,11 +56,11 @@ class PrestoServer {
    */
   def startServer(carbonStorePath: String): Unit = {
 
-    logger.info("======== STARTING PRESTO SERVER ========")
+    LOGGER.info("======== STARTING PRESTO SERVER ========")
     val queryRunner: DistributedQueryRunner = createQueryRunner(
       prestoProperties, carbonStorePath)
 
-    logger.info("STARTED SERVER AT :" + queryRunner.getCoordinator.getBaseUrl)
+    LOGGER.info("STARTED SERVER AT :" + queryRunner.getCoordinator.getBaseUrl)
   }
 
   /**
@@ -68,15 +69,15 @@ class PrestoServer {
    * @param carbonStorePath the store path of carbon
    * @param dbName the database name , if not a default database
    */
-  def startServer(carbonStorePath: String, dbName: String): Unit = {
+  def startServer(carbonStorePath: String, dbName: String, properties: util.Map[String, String]= new util.HashMap[String, String]()): Unit = {
 
     this.dbName = dbName
-
-    logger.info("======== STARTING PRESTO SERVER ========")
+    carbonProperties.putAll(properties)
+    LOGGER.info("======== STARTING PRESTO SERVER ========")
     val queryRunner: DistributedQueryRunner = createQueryRunner(
       prestoProperties, carbonStorePath)
 
-    logger.info("STARTED SERVER AT :" + queryRunner.getCoordinator.getBaseUrl)
+    LOGGER.info("STARTED SERVER AT :" + queryRunner.getCoordinator.getBaseUrl)
   }
 
   /**
@@ -87,7 +88,7 @@ class PrestoServer {
     Try {
       queryRunner.installPlugin(new CarbondataPlugin)
       val carbonProperties = ImmutableMap.builder[String, String]
-        .put("carbondata-store", carbonStorePath)
+        .putAll(this.carbonProperties)
         .put("carbon.unsafe.working.memory.in.mb", "512").build
 
       // CreateCatalog will create a catalog for CarbonData in etc/catalog.
@@ -104,7 +105,7 @@ class PrestoServer {
    */
   def stopServer(): Unit = {
     queryRunner.close()
-    logger.info("***** Stopping The Server *****")
+    LOGGER.info("***** Stopping The Server *****")
   }
 
   /**
@@ -117,13 +118,28 @@ class PrestoServer {
 
     Try {
       val conn: Connection = createJdbcConnection(dbName)
-      logger.info(s"***** executing the query ***** \n $query")
+      LOGGER.info(s"***** executing the query ***** \n $query")
       val statement = conn.createStatement()
       val result: ResultSet = statement.executeQuery(query)
       convertResultSetToList(result)
     } match {
       case Success(result) => result
-      case Failure(jdbcException) => logger
+      case Failure(jdbcException) => LOGGER
+        .error(s"exception occurs${ jdbcException.getMessage } \n query failed $query")
+        throw jdbcException
+    }
+  }
+
+  def execute(query: String) = {
+
+    Try {
+      val conn: Connection = createJdbcConnection(dbName)
+      LOGGER.info(s"***** executing the query ***** \n $query")
+      val statement = conn.createStatement()
+      statement.execute(query)
+    } match {
+      case Success(result) => result
+      case Failure(jdbcException) => LOGGER
         .error(s"exception occurs${ jdbcException.getMessage } \n query failed $query")
         throw jdbcException
     }
@@ -145,7 +161,7 @@ class PrestoServer {
     val properties = new Properties
     // The database Credentials
     properties.setProperty("user", "test")
-  
+
     // STEP 2: Register JDBC driver
     Class.forName(JDBC_DRIVER)
     // STEP 3: Open a connection
@@ -180,7 +196,7 @@ class PrestoServer {
    * CreateSession will create a new session in the Server to connect and execute queries.
    */
   private def createSession: Session = {
-    logger.info("\n Creating The Presto Server Session")
+    LOGGER.info("\n Creating The Presto Server Session")
     Session.builder(new SessionPropertyManager)
       .setQueryId(new QueryIdGenerator().createNextQueryId)
       .setIdentity(new Identity("user", Optional.empty()))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index f687855..115e868 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -78,9 +78,7 @@ object CarbonDataStoreCreator {
         new CarbonTableIdentifier(dbName,
           tableName,
           UUID.randomUUID().toString))
-      //   val factFilePath: String = new File(dataFilePath).getCanonicalPath
       val storeDir: File = new File(absoluteTableIdentifier.getTablePath)
-      CarbonUtil.deleteFoldersAndFiles(storeDir)
       val table: CarbonTable = createTable(absoluteTableIdentifier, useLocalDict)
       writeDictionary(dataFilePath, table, absoluteTableIdentifier)
       val schema: CarbonDataLoadSchema = new CarbonDataLoadSchema(table)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala
index 0839827..7216134 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateHiveTableWithCarbonDS.scala
@@ -50,12 +50,15 @@ class TestCreateHiveTableWithCarbonDS extends QueryTest with BeforeAndAfterAll {
   }
 
   private def verifyTable = {
-    val table = sqlContext.sparkSession.asInstanceOf[CarbonSession].sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient().getTable("default", "source")
-    assertResult(table.schema.fields.length)(3)
-    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
-      assertResult(table.storage.locationUri.get)(new Path(s"file:$storeLocation/source").toUri)
+    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      val table = sqlContext.sparkSession.asInstanceOf[CarbonSession].sessionState.catalog
+        .asInstanceOf[CarbonSessionCatalog].getClient().getTable("default", "source")
+      assertResult(table.schema.fields.length)(3)
+      if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+        assertResult(table.storage.locationUri.get)(new Path(s"file:$storeLocation/source").toUri)
+      }
+      assertResult(table.storage.inputFormat.get)(classOf[CarbonTableInputFormat[_]].getName)
     }
-    assertResult(table.storage.inputFormat.get)(classOf[CarbonTableInputFormat[_]].getName)
   }
 
   test("test create table and verify the hive table correctness with using carbondata") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 2a84501..92f35f6 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -340,22 +340,27 @@ object CarbonReflectionUtils {
   def updateCarbonSerdeInfo(): Unit = {
     val currentMirror = scala.reflect.runtime.currentMirror
     val instanceMirror = currentMirror.reflect(HiveSerDe)
-    val field = currentMirror.staticClass(HiveSerDe.getClass.getName).
+    currentMirror.staticClass(HiveSerDe.getClass.getName).
       toType.members.find { p =>
       !p.isMethod && p.name.toString.equals("serdeMap")
-    }.get.asTerm
-    val serdeMap = instanceMirror.reflectField(field).get.asInstanceOf[Map[String, HiveSerDe]]
-    val updatedSerdeMap =
-      serdeMap ++ Map[String, HiveSerDe](
-        ("org.apache.spark.sql.carbonsource", HiveSerDe(Some(
-          classOf[CarbonTableInputFormat[_]].getName),
-          Some(classOf[CarbonTableOutputFormat].getName))),
-        ("carbon", HiveSerDe(Some(
-          classOf[CarbonTableInputFormat[_]].getName),
-          Some(classOf[CarbonTableOutputFormat].getName))),
-        ("carbondata", HiveSerDe(Some(
-          classOf[CarbonTableInputFormat[_]].getName),
-          Some(classOf[CarbonTableOutputFormat].getName))))
-    instanceMirror.reflectField(field).set(updatedSerdeMap)
+    } match {
+      case Some(field) =>
+        val serdeMap =
+          instanceMirror.reflectField(field.asTerm).get.asInstanceOf[Map[String, HiveSerDe]]
+        val updatedSerdeMap =
+          serdeMap ++ Map[String, HiveSerDe](
+            ("org.apache.spark.sql.carbonsource", HiveSerDe(Some(
+              classOf[CarbonTableInputFormat[_]].getName),
+              Some(classOf[CarbonTableOutputFormat].getName))),
+            ("carbon", HiveSerDe(Some(
+              classOf[CarbonTableInputFormat[_]].getName),
+              Some(classOf[CarbonTableOutputFormat].getName))),
+            ("carbondata", HiveSerDe(Some(
+              classOf[CarbonTableInputFormat[_]].getName),
+              Some(classOf[CarbonTableOutputFormat].getName))))
+        instanceMirror.reflectField(field.asTerm).set(updatedSerdeMap)
+      case _ =>
+    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index a13e8e0..12eb420 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -144,11 +144,12 @@ case class CarbonCreateTableCommand(
             } else {
               ""
             }
-          // isVisible property is added to hive table properties to differentiate between main
-          // table and datamaps(like preaggregate). It is false only for datamaps. This is added
-          // to improve the show tables performance when filtering the datamaps from main tables
+
           // synchronized to prevent concurrently creation of table with same name
           CarbonCreateTableCommand.synchronized {
+            // isVisible property is added to hive table properties to differentiate between main
+            // table and datamaps(like preaggregate). It is false only for datamaps. This is added
+            // to improve the show tables performance when filtering the datamaps from main tables
             sparkSession.sql(
               s"""CREATE TABLE $dbName.$tableName
                  |(${ rawSchema })
@@ -157,7 +158,7 @@ case class CarbonCreateTableCommand(
                  |  tableName "$tableName",
                  |  dbName "$dbName",
                  |  tablePath "$tablePath",
-                 |  path "$tablePath",
+                 |  path "${FileFactory.addSchemeIfNotExists(tablePath)}",
                  |  isExternal "$isExternal",
                  |  isTransactional "$isTransactionalTable",
                  |  isVisible "$isVisible"


[2/2] carbondata git commit: [CARBONDATA-3194] Integrating Carbon with Presto using hive connector

Posted by ja...@apache.org.
[CARBONDATA-3194] Integrating Carbon with Presto using hive connector

This PR extend the CarbondataConnectorFactory with HiveConnectorFactory, so that all features of HIve presto connector will be inherited to carbon as well.
It simplifies the integration so removed lot of old code.

This closes #3019


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e193df0a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e193df0a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e193df0a

Branch: refs/heads/master
Commit: e193df0a1db6237babd4c726c0ea32f4ebd5a74e
Parents: 7c4e79f
Author: ravipesala <ra...@gmail.com>
Authored: Wed Dec 19 21:19:41 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Dec 29 11:32:18 2018 +0800

----------------------------------------------------------------------
 .../core/datastore/impl/FileFactory.java        |  23 ++
 integration/presto/pom.xml                      |  19 ++
 .../presto/CarbondataColumnHandle.java          | 143 ----------
 .../carbondata/presto/CarbondataConnector.java  |  78 ------
 .../presto/CarbondataConnectorFactory.java      | 157 +++++++++--
 .../presto/CarbondataConnectorId.java           |  52 ----
 .../carbondata/presto/CarbondataErrorCode.java  |  38 ---
 .../presto/CarbondataHandleResolver.java        |  43 ---
 .../carbondata/presto/CarbondataMetadata.java   | 272 -------------------
 .../carbondata/presto/CarbondataModule.java     | 153 ++++++++---
 .../presto/CarbondataPageSourceProvider.java    | 100 ++++---
 .../carbondata/presto/CarbondataSplit.java      | 102 -------
 .../presto/CarbondataSplitManager.java          | 120 +++++---
 .../presto/CarbondataTableHandle.java           |  71 -----
 .../presto/CarbondataTableLayoutHandle.java     |  71 -----
 .../presto/CarbondataTransactionHandle.java     |  65 -----
 .../carbondata/presto/PrestoFilterUtil.java     | 126 ++++-----
 .../presto/impl/CarbonLocalMultiBlockSplit.java |  12 +-
 .../presto/impl/CarbonTableConfig.java          |  35 ---
 .../presto/impl/CarbonTableReader.java          | 226 ++-------------
 .../PrestoAllDataTypeLocalDictTest.scala        |  16 +-
 .../integrationtest/PrestoAllDataTypeTest.scala |  14 +-
 .../PrestoTestNonTransactionalTableFiles.scala  |  23 +-
 .../carbondata/presto/server/PrestoServer.scala |  42 ++-
 .../presto/util/CarbonDataStoreCreator.scala    |   2 -
 .../TestCreateHiveTableWithCarbonDS.scala       |  13 +-
 .../spark/util/CarbonReflectionUtils.scala      |  35 ++-
 .../table/CarbonCreateTableCommand.scala        |   9 +-
 28 files changed, 638 insertions(+), 1422 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index a732559..25a59f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -369,6 +369,29 @@ public final class FileFactory {
   }
 
   /**
+   * Adds the schema to file path if not exists to the file path.
+   * @param filePath path of file
+   * @return Updated filepath
+   */
+  public static String addSchemeIfNotExists(String filePath) {
+    FileType fileType = getFileType(filePath);
+    switch (fileType) {
+      case LOCAL:
+        if (filePath.startsWith("file:")) {
+          return filePath;
+        } else {
+          return new Path("file://" + filePath).toString();
+        }
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+      case S3:
+      default:
+        return filePath;
+    }
+  }
+
+  /**
    * below method will be used to update the file path
    * for local type
    * it removes the file:/ from the path

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index 9fc1ace..32b0ca7 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -409,6 +409,25 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>com.facebook.presto</groupId>
+      <artifactId>presto-hive</artifactId>
+      <version>${presto.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.openjdk.jol</groupId>
+          <artifactId>jol-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.antlr</groupId>
+          <artifactId>antlr4-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-compress</artifactId>
       <version>1.4.1</version>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
deleted file mode 100755
index 7152bb4..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.util.Objects;
-
-import static java.util.Objects.requireNonNull;
-
-import com.facebook.presto.spi.ColumnHandle;
-import com.facebook.presto.spi.ColumnMetadata;
-import com.facebook.presto.spi.type.Type;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-
-public class CarbondataColumnHandle implements ColumnHandle {
-  private final String connectorId;
-  private final String columnName;
-
-  public boolean isInvertedIndex() {
-    return isInvertedIndex;
-  }
-
-  private final Type columnType;
-  private final int ordinalPosition;
-  private final int keyOrdinal;
-
-  private final String columnUniqueId;
-  private final boolean isInvertedIndex;
-
-  /**
-   * Used when this column contains decimal data.
-   */
-  private int scale;
-
-  private int precision;
-
-
-  public boolean isMeasure() {
-    return isMeasure;
-  }
-
-  private final boolean isMeasure;
-
-  public int getKeyOrdinal() {
-    return keyOrdinal;
-  }
-
-  @JsonCreator public CarbondataColumnHandle(
-      @JsonProperty("connectorId") String connectorId,
-      @JsonProperty("columnName") String columnName,
-      @JsonProperty("columnType") Type columnType,
-      @JsonProperty("ordinalPosition") int ordinalPosition,
-      @JsonProperty("keyOrdinal") int keyOrdinal,
-      @JsonProperty("isMeasure") boolean isMeasure,
-      @JsonProperty("columnUniqueId") String columnUniqueId,
-      @JsonProperty("isInvertedIndex") boolean isInvertedIndex,
-      @JsonProperty("precision") int precision,
-      @JsonProperty("scale") int scale) {
-    this.connectorId = requireNonNull(connectorId, "connectorId is null");
-    this.columnName = requireNonNull(columnName, "columnName is null");
-    this.columnType = requireNonNull(columnType, "columnType is null");
-
-    this.ordinalPosition = requireNonNull(ordinalPosition, "ordinalPosition is null");
-    this.keyOrdinal = requireNonNull(keyOrdinal, "keyOrdinal is null");
-
-    this.isMeasure = isMeasure;
-    this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
-    this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
-    this.precision = precision;
-    this.scale = scale;
-  }
-
-  @JsonProperty public String getConnectorId() {
-    return connectorId;
-  }
-
-  @JsonProperty public String getColumnName() {
-    return columnName;
-  }
-
-  @JsonProperty public Type getColumnType() {
-    return columnType;
-  }
-
-  @JsonProperty public int getOrdinalPosition() {
-    return ordinalPosition;
-  }
-
-  public ColumnMetadata getColumnMetadata() {
-    return new ColumnMetadata(columnName, columnType, null, false);
-  }
-
-  @Override public int hashCode() {
-    return Objects.hash(connectorId, columnName);
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if ((obj == null) || (getClass() != obj.getClass())) {
-      return false;
-    }
-
-    CarbondataColumnHandle other = (CarbondataColumnHandle) obj;
-    return Objects.equals(this.connectorId, other.connectorId) && Objects
-        .equals(this.columnName, other.columnName);
-  }
-
-  @Override public String toString() {
-    return toStringHelper(this).add("connectorId", connectorId).add("columnName", columnName)
-        .add("columnType", columnType).add("ordinalPosition", ordinalPosition).toString();
-  }
-
-  @JsonProperty public int getScale() {
-    return scale;
-  }
-
-  @JsonProperty public int getPrecision() {
-    return precision;
-  }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
deleted file mode 100755
index ab56f8d..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import static java.util.Objects.requireNonNull;
-
-import com.facebook.presto.spi.connector.*;
-import com.facebook.presto.spi.transaction.IsolationLevel;
-import io.airlift.bootstrap.LifeCycleManager;
-import io.airlift.log.Logger;
-
-import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
-import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
-
-public class CarbondataConnector implements Connector {
-
-  private static final Logger log = Logger.get(CarbondataConnector.class);
-
-  private final LifeCycleManager lifeCycleManager;
-  private final ConnectorMetadata metadata;
-  private final ConnectorSplitManager splitManager;
-  private final ClassLoader classLoader;
-  private final ConnectorPageSourceProvider pageSourceProvider;
-
-  public CarbondataConnector(LifeCycleManager lifeCycleManager, ConnectorMetadata metadata,
-      ConnectorSplitManager splitManager,
-      ClassLoader classLoader, ConnectorPageSourceProvider pageSourceProvider) {
-    this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
-    this.metadata = requireNonNull(metadata, "metadata is null");
-    this.splitManager = requireNonNull(splitManager, "splitManager is null");
-    this.classLoader = requireNonNull(classLoader, "classLoader is null");
-    this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
-  }
-
-  @Override public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel,
-      boolean readOnly) {
-    checkConnectorSupports(READ_COMMITTED, isolationLevel);
-    return new CarbondataTransactionHandle();
-  }
-
-  @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) {
-    return metadata;
-  }
-
-  @Override public ConnectorSplitManager getSplitManager() {
-    return splitManager;
-  }
-
-
-  @Override
-  public ConnectorPageSourceProvider getPageSourceProvider()
-  {
-    return pageSourceProvider;
-  }
-
-  @Override public final void shutdown() {
-    try {
-      lifeCycleManager.stop();
-    } catch (Exception e) {
-      log.error(e, "Error shutting down connector");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index 6bd52e6..1dd5176 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -17,69 +17,178 @@
 
 package org.apache.carbondata.presto;
 
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.*;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 
 import static java.util.Objects.requireNonNull;
 
-import com.facebook.presto.spi.ConnectorHandleResolver;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.facebook.presto.hive.HiveConnector;
+import com.facebook.presto.hive.HiveConnectorFactory;
+import com.facebook.presto.hive.HiveMetadataFactory;
+import com.facebook.presto.hive.HiveProcedureModule;
+import com.facebook.presto.hive.HiveSchemaProperties;
+import com.facebook.presto.hive.HiveSessionProperties;
+import com.facebook.presto.hive.HiveStorageFormat;
+import com.facebook.presto.hive.HiveTableProperties;
+import com.facebook.presto.hive.HiveTransactionManager;
+import com.facebook.presto.hive.NodeVersion;
+import com.facebook.presto.hive.RebindSafeMBeanServer;
+import com.facebook.presto.hive.authentication.HiveAuthenticationModule;
+import com.facebook.presto.hive.metastore.HiveMetastoreModule;
+import com.facebook.presto.hive.s3.HiveS3Module;
+import com.facebook.presto.hive.security.HiveSecurityModule;
+import com.facebook.presto.hive.security.PartitionsAwareAccessControl;
+import com.facebook.presto.spi.NodeManager;
+import com.facebook.presto.spi.PageIndexerFactory;
+import com.facebook.presto.spi.PageSorter;
 import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
-import com.facebook.presto.spi.connector.*;
-import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorAccessControl;
+import com.facebook.presto.spi.connector.ConnectorContext;
+import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
+import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSinkProvider;
 import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
 import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
-import com.google.common.base.Throwables;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeNodePartitioningProvider;
+import com.facebook.presto.spi.procedure.Procedure;
+import com.facebook.presto.spi.type.TypeManager;
+import com.google.common.collect.ImmutableSet;
 import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
 import io.airlift.bootstrap.Bootstrap;
 import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.event.client.EventModule;
 import io.airlift.json.JsonModule;
+import io.airlift.units.DataSize;
+import org.weakref.jmx.guice.MBeanModule;
+import sun.reflect.ConstructorAccessor;
 
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static io.airlift.configuration.ConfigBinder.configBinder;
 
 /**
  * Build Carbondata Connector
  * It will be called by CarbondataPlugin
  */
-public class CarbondataConnectorFactory implements ConnectorFactory {
+public class CarbondataConnectorFactory extends HiveConnectorFactory {
 
-  private final String name;
   private final ClassLoader classLoader;
 
   public CarbondataConnectorFactory(String connectorName, ClassLoader classLoader) {
-    this.name = connectorName;
+    super(connectorName, classLoader, null);
     this.classLoader = requireNonNull(classLoader, "classLoader is null");
   }
 
-  @Override public String getName() {
-    return name;
-  }
-
-  @Override public ConnectorHandleResolver getHandleResolver() {
-    return new CarbondataHandleResolver();
-  }
-
-  @Override public Connector create(String connectorId, Map<String, String> config,
+  @Override public Connector create(String catalogName, Map<String, String> config,
       ConnectorContext context) {
     requireNonNull(config, "config is null");
 
     try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
-      Bootstrap app = new Bootstrap(new JsonModule(),
-          new CarbondataModule(connectorId, context.getTypeManager()));
+      Bootstrap app = new Bootstrap(new EventModule(), new MBeanModule(), new JsonModule(),
+          new CarbondataModule(catalogName), new HiveS3Module(catalogName),
+          new HiveMetastoreModule(catalogName, Optional.ofNullable(null)), new HiveSecurityModule(),
+          new HiveAuthenticationModule(), new HiveProcedureModule(), binder -> {
+        javax.management.MBeanServer platformMBeanServer =
+            ManagementFactory.getPlatformMBeanServer();
+        binder.bind(javax.management.MBeanServer.class)
+            .toInstance(new RebindSafeMBeanServer(platformMBeanServer));
+        binder.bind(NodeVersion.class)
+            .toInstance(new NodeVersion(context.getNodeManager().getCurrentNode().getVersion()));
+        binder.bind(NodeManager.class).toInstance(context.getNodeManager());
+        binder.bind(TypeManager.class).toInstance(context.getTypeManager());
+        binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory());
+        binder.bind(PageSorter.class).toInstance(context.getPageSorter());
+        configBinder(binder).bindConfig(CarbonTableConfig.class);
+      });
 
       Injector injector =
           app.strictConfig().doNotInitializeLogging().setRequiredConfigurationProperties(config)
               .initialize();
 
+      setCarbonEnum();
+
       LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
-      ConnectorMetadata metadata = injector.getInstance(CarbondataMetadata.class);
+      HiveMetadataFactory metadataFactory = injector.getInstance(HiveMetadataFactory.class);
+      HiveTransactionManager transactionManager =
+          injector.getInstance(HiveTransactionManager.class);
       ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
       ConnectorPageSourceProvider connectorPageSource =
           injector.getInstance(ConnectorPageSourceProvider.class);
+      ConnectorPageSinkProvider pageSinkProvider =
+          injector.getInstance(ConnectorPageSinkProvider.class);
+      ConnectorNodePartitioningProvider connectorDistributionProvider =
+          injector.getInstance(ConnectorNodePartitioningProvider.class);
+      HiveSessionProperties hiveSessionProperties =
+          injector.getInstance(HiveSessionProperties.class);
+      HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
+      ConnectorAccessControl accessControl =
+          new PartitionsAwareAccessControl(injector.getInstance(ConnectorAccessControl.class));
+      Set<Procedure> procedures = injector.getInstance(Key.get(new TypeLiteral<Set<Procedure>>() {
+      }));
 
-      return new CarbondataConnector(lifeCycleManager,
-          new ClassLoaderSafeConnectorMetadata(metadata, classLoader),
-          new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), classLoader,
-          new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader));
+      return new HiveConnector(lifeCycleManager, metadataFactory, transactionManager,
+          new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
+          new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
+          new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
+          new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
+          ImmutableSet.of(), procedures, hiveSessionProperties.getSessionProperties(),
+          HiveSchemaProperties.SCHEMA_PROPERTIES, hiveTableProperties.getTableProperties(),
+          accessControl, classLoader);
     } catch (Exception e) {
-      throw Throwables.propagate(e);
+      throwIfUnchecked(e);
+      throw new RuntimeException(e);
     }
   }
+
+  /**
+   * Set the Carbon format enum to HiveStorageFormat, its a hack but for time being it is best
+   * choice to avoid lot of code change.
+   *
+   * @throws Exception
+   */
+  private void setCarbonEnum() throws Exception {
+    for (HiveStorageFormat format : HiveStorageFormat.values()) {
+      if (format.name().equals("CARBON")) {
+        return;
+      }
+    }
+    Constructor<?>[] declaredConstructors = HiveStorageFormat.class.getDeclaredConstructors();
+    declaredConstructors[0].setAccessible(true);
+    Field constructorAccessorField = Constructor.class.getDeclaredField("constructorAccessor");
+    constructorAccessorField.setAccessible(true);
+    ConstructorAccessor ca =
+        (ConstructorAccessor) constructorAccessorField.get(declaredConstructors[0]);
+    if (ca == null) {
+      Method acquireConstructorAccessorMethod =
+          Constructor.class.getDeclaredMethod("acquireConstructorAccessor");
+      acquireConstructorAccessorMethod.setAccessible(true);
+      ca = (ConstructorAccessor) acquireConstructorAccessorMethod.invoke(declaredConstructors[0]);
+    }
+    Object instance = ca.newInstance(new Object[] { "CARBON", HiveStorageFormat.values().length, "",
+        CarbonTableInputFormat.class.getName(), CarbonTableOutputFormat.class.getName(),
+        new DataSize(256.0D, DataSize.Unit.MEGABYTE) });
+    Field values = HiveStorageFormat.class.getDeclaredField("$VALUES");
+    values.setAccessible(true);
+    Field modifiersField = Field.class.getDeclaredField("modifiers");
+    modifiersField.setAccessible(true);
+    modifiersField.setInt(values, values.getModifiers() & ~Modifier.FINAL);
+
+    HiveStorageFormat[] hiveStorageFormats =
+        new HiveStorageFormat[HiveStorageFormat.values().length + 1];
+    HiveStorageFormat[] src = (HiveStorageFormat[]) values.get(null);
+    System.arraycopy(src, 0, hiveStorageFormats, 0, src.length);
+    hiveStorageFormats[src.length] = (HiveStorageFormat) instance;
+    values.set(null, hiveStorageFormats);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
deleted file mode 100755
index d25e569..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.util.Objects;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.inject.Inject;
-
-public class CarbondataConnectorId {
-  private final String id;
-
-  @Inject public CarbondataConnectorId(String id) {
-    this.id = requireNonNull(id, "id is null");
-  }
-
-  @Override public String toString() {
-    return id;
-  }
-
-  @Override public int hashCode() {
-    return Objects.hash(id);
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-
-    if ((obj == null) || (getClass() != obj.getClass())) {
-      return false;
-    }
-
-    return Objects.equals(this.id, ((CarbondataConnectorId) obj).id);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataErrorCode.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataErrorCode.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataErrorCode.java
deleted file mode 100644
index 45971d0..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataErrorCode.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.presto;
-
-import com.facebook.presto.spi.ErrorCode;
-import com.facebook.presto.spi.ErrorCodeSupplier;
-import com.facebook.presto.spi.ErrorType;
-
-import static com.facebook.presto.spi.ErrorType.EXTERNAL;
-import static com.facebook.presto.spi.ErrorType.INTERNAL_ERROR;
-
-public enum CarbondataErrorCode implements ErrorCodeSupplier {
-  CARBON_NOT_SUPPORT_TYPE(1, INTERNAL_ERROR), CARBON_INVALID_TYPE_VALUE(2, EXTERNAL);
-
-  private final ErrorCode errorCode;
-
-  CarbondataErrorCode(int code, ErrorType type) {
-    errorCode = new ErrorCode(code + 0x0100_0000, name(), type);
-  }
-
-  @Override public ErrorCode toErrorCode() {
-    return errorCode;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java
deleted file mode 100755
index 7c65bfd..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import com.facebook.presto.spi.*;
-import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
-
-public class CarbondataHandleResolver implements ConnectorHandleResolver {
-  @Override public Class<? extends ConnectorTableHandle> getTableHandleClass() {
-    return CarbondataTableHandle.class;
-  }
-
-  @Override public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass() {
-    return CarbondataTableLayoutHandle.class;
-  }
-
-  @Override public Class<? extends ColumnHandle> getColumnHandleClass() {
-    return CarbondataColumnHandle.class;
-  }
-
-  @Override public Class<? extends ConnectorSplit> getSplitClass() {
-    return CarbondataSplit.class;
-  }
-
-  @Override public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass() {
-    return CarbondataTransactionHandle.class;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
deleted file mode 100755
index f56f517..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-import javax.inject.Inject;
-
-import static java.util.Objects.requireNonNull;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.presto.impl.CarbonTableReader;
-
-import static org.apache.carbondata.presto.Types.checkType;
-
-import com.facebook.presto.spi.ColumnHandle;
-import com.facebook.presto.spi.ColumnMetadata;
-import com.facebook.presto.spi.ConnectorSession;
-import com.facebook.presto.spi.ConnectorTableHandle;
-import com.facebook.presto.spi.ConnectorTableLayout;
-import com.facebook.presto.spi.ConnectorTableLayoutHandle;
-import com.facebook.presto.spi.ConnectorTableLayoutResult;
-import com.facebook.presto.spi.ConnectorTableMetadata;
-import com.facebook.presto.spi.Constraint;
-import com.facebook.presto.spi.SchemaNotFoundException;
-import com.facebook.presto.spi.SchemaTableName;
-import com.facebook.presto.spi.SchemaTablePrefix;
-import com.facebook.presto.spi.connector.ConnectorMetadata;
-import com.facebook.presto.spi.type.BigintType;
-import com.facebook.presto.spi.type.BooleanType;
-import com.facebook.presto.spi.type.DateType;
-import com.facebook.presto.spi.type.DecimalType;
-import com.facebook.presto.spi.type.DoubleType;
-import com.facebook.presto.spi.type.IntegerType;
-import com.facebook.presto.spi.type.SmallintType;
-import com.facebook.presto.spi.type.TimestampType;
-import com.facebook.presto.spi.type.Type;
-import com.facebook.presto.spi.type.VarcharType;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-public class CarbondataMetadata implements ConnectorMetadata {
-  private final String connectorId;
-  private CarbonTableReader carbonTableReader;
-
-  private Map<String, ColumnHandle> columnHandleMap;
-
-  @Inject public CarbondataMetadata(CarbondataConnectorId connectorId, CarbonTableReader reader) {
-    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
-    this.carbonTableReader = requireNonNull(reader, "client is null");
-  }
-
-
-  @Override public List<String> listSchemaNames(ConnectorSession session) {
-    return listSchemaNamesInternal();
-  }
-
-  public List<String> listSchemaNamesInternal() {
-    return carbonTableReader.getSchemaNames();
-  }
-
-  @Override
-  public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) {
-
-    List<String> schemaNames;
-    if (schemaNameOrNull != null) {
-      schemaNames = ImmutableList.of(schemaNameOrNull);
-    } else {
-      schemaNames = carbonTableReader.getSchemaNames();
-    }
-
-    ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
-    for (String schemaName : schemaNames) {
-      for (String tableName : carbonTableReader.getTableNames(schemaName)) {
-        if (!tableName.equalsIgnoreCase(".DS_Store")) {
-          builder.add(new SchemaTableName(schemaName, tableName));
-        }
-      }
-    }
-    return builder.build();
-  }
-
-  @Override
-  public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session,
-      SchemaTablePrefix prefix) {
-    requireNonNull(prefix, "SchemaTablePrefix is null");
-
-    ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
-    for (SchemaTableName tableName : listTables(session, prefix)) {
-      ConnectorTableMetadata tableMetadata = getTableMetadata(tableName);
-      if (tableMetadata != null) {
-        columns.put(tableName, tableMetadata.getColumns());
-      }
-    }
-    return columns.build();
-  }
-
-  //if prefix is null. return all tables
-  //if prefix is not null, just return this table
-  private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix) {
-    if (prefix.getSchemaName() == null) {
-      return listTables(session, prefix.getSchemaName());
-    }
-    return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
-  }
-
-  private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) {
-    if (!listSchemaNamesInternal().contains(schemaTableName.getSchemaName())) {
-      throw new SchemaNotFoundException(schemaTableName.getSchemaName());
-    }
-
-    CarbonTable carbonTable = carbonTableReader.getTable(schemaTableName);
-
-    List<ColumnMetadata> columnsMetaList = new LinkedList<>();
-    List<CarbonColumn> carbonColumns =
-        carbonTable.getCreateOrderColumn(schemaTableName.getTableName());
-    for (CarbonColumn col : carbonColumns) {
-      //show columns command will return these data
-      ColumnSchema columnSchema = col.getColumnSchema();
-      Type columnType = carbonDataType2SpiMapper(columnSchema);
-      String extraValues =
-          columnSchema.getEncodingList().stream().map(encoding -> encoding.toString() + " ")
-              .reduce("", String::concat);
-      ColumnMetadata columnMeta =
-          new ColumnMetadata(columnSchema.getColumnName(), columnType, "", extraValues, false);
-      columnsMetaList.add(columnMeta);
-    }
-
-    //carbondata connector's table metadata
-    return new ConnectorTableMetadata(schemaTableName, columnsMetaList);
-  }
-
-  @Override public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session,
-      ConnectorTableHandle tableHandle) {
-
-    CarbondataTableHandle handle =
-        checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
-    checkArgument(handle.getConnectorId().equals(connectorId),
-        "tableHandle is not for this connector");
-
-    String schemaName = handle.getSchemaTableName().getSchemaName();
-
-    if (!listSchemaNamesInternal().contains(schemaName)) {
-      throw new SchemaNotFoundException(schemaName);
-    }
-
-    //CarbonTable(official struct) is stored in CarbonMetadata(official struct)
-    CarbonTable cb = carbonTableReader.getTable(handle.getSchemaTableName());
-
-    ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
-    String tableName = handle.getSchemaTableName().getTableName();
-    for (CarbonDimension column : cb.getDimensionByTableName(tableName)) {
-      ColumnSchema cs = column.getColumnSchema();
-
-      Type spiType = carbonDataType2SpiMapper(cs);
-      columnHandles.put(cs.getColumnName(),
-          new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType,
-              column.getSchemaOrdinal(), column.getKeyOrdinal(), false, cs.getColumnUniqueId(),
-              cs.isUseInvertedIndex(), cs.getPrecision(), cs.getScale()));
-    }
-
-    for (CarbonMeasure measure : cb.getMeasureByTableName(tableName)) {
-      ColumnSchema cs = measure.getColumnSchema();
-      Type spiType = carbonDataType2SpiMapper(cs);
-      columnHandles.put(cs.getColumnName(),
-          new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType,
-              cs.getSchemaOrdinal(), measure.getOrdinal(), true, cs.getColumnUniqueId(),
-              cs.isUseInvertedIndex(), cs.getPrecision(), cs.getScale()));
-    }
-
-    columnHandleMap = columnHandles.build();
-
-    return columnHandleMap;
-  }
-
-  @Override public ColumnMetadata getColumnMetadata(ConnectorSession session,
-      ConnectorTableHandle tableHandle, ColumnHandle columnHandle) {
-
-    checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
-    return checkType(columnHandle, CarbondataColumnHandle.class, "columnHandle")
-        .getColumnMetadata();
-  }
-
-  @Override
-  public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
-    return new CarbondataTableHandle(connectorId, tableName);
-  }
-
-  @Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session,
-      ConnectorTableHandle table, Constraint<ColumnHandle> constraint,
-      Optional<Set<ColumnHandle>> desiredColumns) {
-    CarbondataTableHandle handle = checkType(table, CarbondataTableHandle.class, "table");
-    ConnectorTableLayout layout = new ConnectorTableLayout(
-        new CarbondataTableLayoutHandle(handle, constraint.getSummary()));
-    return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
-  }
-
-  @Override public ConnectorTableLayout getTableLayout(ConnectorSession session,
-      ConnectorTableLayoutHandle handle) {
-    return new ConnectorTableLayout(handle);
-  }
-
-  @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session,
-      ConnectorTableHandle table) {
-    return getTableMetadataInternal(table);
-  }
-
-  public ConnectorTableMetadata getTableMetadataInternal(ConnectorTableHandle table) {
-    CarbondataTableHandle carbondataTableHandle =
-        checkType(table, CarbondataTableHandle.class, "table");
-    checkArgument(carbondataTableHandle.getConnectorId().equals(connectorId),
-        "tableHandle is not for this connector");
-    return getTableMetadata(carbondataTableHandle.getSchemaTableName());
-  }
-
-  public static Type carbonDataType2SpiMapper(ColumnSchema columnSchema) {
-    DataType colType = columnSchema.getDataType();
-    if (colType == DataTypes.BOOLEAN) {
-      return BooleanType.BOOLEAN;
-    } else if (colType == DataTypes.SHORT) {
-      return SmallintType.SMALLINT;
-    } else if (colType == DataTypes.INT) {
-      return IntegerType.INTEGER;
-    } else if (colType == DataTypes.LONG) {
-      return BigintType.BIGINT;
-    } else if (colType == DataTypes.FLOAT || colType == DataTypes.DOUBLE) {
-      return DoubleType.DOUBLE;
-    } else if (DataTypes.isDecimal(colType)) {
-      if (columnSchema.getPrecision() > 0) {
-        return DecimalType.createDecimalType(columnSchema.getPrecision(), columnSchema.getScale());
-      } else {
-        return DecimalType.createDecimalType();
-      }
-    } else if (colType == DataTypes.STRING) {
-      return VarcharType.VARCHAR;
-    } else if (colType == DataTypes.DATE) {
-      return DateType.DATE;
-    } else if (colType == DataTypes.TIMESTAMP) {
-      return TimestampType.TIMESTAMP;
-    } else {
-      return VarcharType.VARCHAR;
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
index f9418a4..1f63b98 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
@@ -17,62 +17,153 @@
 
 package org.apache.carbondata.presto;
 
-import javax.inject.Inject;
+import java.util.function.Supplier;
 
 import static java.util.Objects.requireNonNull;
 
-import org.apache.carbondata.presto.impl.CarbonTableConfig;
 import org.apache.carbondata.presto.impl.CarbonTableReader;
 
+import com.facebook.presto.hive.CoercionPolicy;
+import com.facebook.presto.hive.DirectoryLister;
+import com.facebook.presto.hive.FileFormatDataSourceStats;
+import com.facebook.presto.hive.GenericHiveRecordCursorProvider;
+import com.facebook.presto.hive.HadoopDirectoryLister;
+import com.facebook.presto.hive.HdfsConfiguration;
+import com.facebook.presto.hive.HdfsConfigurationUpdater;
+import com.facebook.presto.hive.HdfsEnvironment;
+import com.facebook.presto.hive.HiveClientConfig;
+import com.facebook.presto.hive.HiveClientModule;
+import com.facebook.presto.hive.HiveCoercionPolicy;
+import com.facebook.presto.hive.HiveConnectorId;
+import com.facebook.presto.hive.HiveEventClient;
+import com.facebook.presto.hive.HiveFileWriterFactory;
+import com.facebook.presto.hive.HiveHdfsConfiguration;
+import com.facebook.presto.hive.HiveLocationService;
+import com.facebook.presto.hive.HiveMetadataFactory;
+import com.facebook.presto.hive.HiveNodePartitioningProvider;
+import com.facebook.presto.hive.HivePageSinkProvider;
+import com.facebook.presto.hive.HivePageSourceFactory;
+import com.facebook.presto.hive.HivePartitionManager;
+import com.facebook.presto.hive.HiveRecordCursorProvider;
+import com.facebook.presto.hive.HiveSessionProperties;
+import com.facebook.presto.hive.HiveSplitManager;
+import com.facebook.presto.hive.HiveTableProperties;
+import com.facebook.presto.hive.HiveTransactionManager;
+import com.facebook.presto.hive.HiveTypeTranslator;
+import com.facebook.presto.hive.HiveWriterStats;
+import com.facebook.presto.hive.LocationService;
+import com.facebook.presto.hive.NamenodeStats;
+import com.facebook.presto.hive.OrcFileWriterConfig;
+import com.facebook.presto.hive.OrcFileWriterFactory;
+import com.facebook.presto.hive.PartitionUpdate;
+import com.facebook.presto.hive.RcFileFileWriterFactory;
+import com.facebook.presto.hive.TableParameterCodec;
+import com.facebook.presto.hive.TransactionalMetadata;
+import com.facebook.presto.hive.TypeTranslator;
+import com.facebook.presto.hive.orc.DwrfPageSourceFactory;
+import com.facebook.presto.hive.orc.OrcPageSourceFactory;
+import com.facebook.presto.hive.parquet.ParquetPageSourceFactory;
+import com.facebook.presto.hive.parquet.ParquetRecordCursorProvider;
+import com.facebook.presto.hive.rcfile.RcFilePageSourceFactory;
+import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
+import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
 import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
 import com.facebook.presto.spi.connector.ConnectorSplitManager;
-import com.facebook.presto.spi.type.Type;
-import com.facebook.presto.spi.type.TypeManager;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
 import com.google.inject.Binder;
-import com.google.inject.Module;
 import com.google.inject.Scopes;
+import com.google.inject.TypeLiteral;
+import com.google.inject.multibindings.Multibinder;
+import io.airlift.event.client.EventClient;
 
-import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
-import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.inject.multibindings.Multibinder.newSetBinder;
 import static io.airlift.configuration.ConfigBinder.configBinder;
+import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
 
-public class CarbondataModule implements Module {
+import static org.weakref.jmx.ObjectNames.generatedNameOf;
+import static org.weakref.jmx.guice.ExportBinder.newExporter;
+
+/**
+ * Binds all necessary classes needed for this module.
+ */
+public class CarbondataModule extends HiveClientModule {
 
   private final String connectorId;
-  private final TypeManager typeManager;
 
-  public CarbondataModule(String connectorId, TypeManager typeManager) {
+  public CarbondataModule(String connectorId) {
+    super(connectorId);
     this.connectorId = requireNonNull(connectorId, "connector id is null");
-    this.typeManager = requireNonNull(typeManager, "typeManager is null");
   }
 
   @Override public void configure(Binder binder) {
-    binder.bind(TypeManager.class).toInstance(typeManager);
+    binder.bind(HiveConnectorId.class).toInstance(new HiveConnectorId(connectorId));
+    binder.bind(TypeTranslator.class).toInstance(new HiveTypeTranslator());
+    binder.bind(CoercionPolicy.class).to(HiveCoercionPolicy.class).in(Scopes.SINGLETON);
 
-    binder.bind(CarbondataConnectorId.class).toInstance(new CarbondataConnectorId(connectorId));
-    binder.bind(CarbondataMetadata.class).in(Scopes.SINGLETON);
-    binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
+    binder.bind(HdfsConfigurationUpdater.class).in(Scopes.SINGLETON);
+    binder.bind(HdfsConfiguration.class).to(HiveHdfsConfiguration.class).in(Scopes.SINGLETON);
+    binder.bind(HdfsEnvironment.class).in(Scopes.SINGLETON);
+    binder.bind(DirectoryLister.class).to(HadoopDirectoryLister.class).in(Scopes.SINGLETON);
+    configBinder(binder).bindConfig(HiveClientConfig.class);
+
+    binder.bind(HiveSessionProperties.class).in(Scopes.SINGLETON);
+    binder.bind(HiveTableProperties.class).in(Scopes.SINGLETON);
+
+    binder.bind(NamenodeStats.class).in(Scopes.SINGLETON);
+    newExporter(binder).export(NamenodeStats.class)
+        .as(generatedNameOf(NamenodeStats.class, connectorId));
+
+    Multibinder<HiveRecordCursorProvider> recordCursorProviderBinder =
+        newSetBinder(binder, HiveRecordCursorProvider.class);
+    recordCursorProviderBinder.addBinding().to(ParquetRecordCursorProvider.class)
+        .in(Scopes.SINGLETON);
+    recordCursorProviderBinder.addBinding().to(GenericHiveRecordCursorProvider.class)
+        .in(Scopes.SINGLETON);
+
+    binder.bind(HiveWriterStats.class).in(Scopes.SINGLETON);
+    newExporter(binder).export(HiveWriterStats.class)
+        .as(generatedNameOf(HiveWriterStats.class, connectorId));
+
+    newSetBinder(binder, EventClient.class).addBinding().to(HiveEventClient.class)
+        .in(Scopes.SINGLETON);
+    binder.bind(HivePartitionManager.class).in(Scopes.SINGLETON);
+    binder.bind(LocationService.class).to(HiveLocationService.class).in(Scopes.SINGLETON);
+    binder.bind(TableParameterCodec.class).in(Scopes.SINGLETON);
+    binder.bind(HiveMetadataFactory.class).in(Scopes.SINGLETON);
+    binder.bind(new TypeLiteral<Supplier<TransactionalMetadata>>() {
+    }).to(HiveMetadataFactory.class).in(Scopes.SINGLETON);
+    binder.bind(HiveTransactionManager.class).in(Scopes.SINGLETON);
     binder.bind(ConnectorSplitManager.class).to(CarbondataSplitManager.class).in(Scopes.SINGLETON);
+    newExporter(binder).export(ConnectorSplitManager.class)
+        .as(generatedNameOf(HiveSplitManager.class, connectorId));
     binder.bind(ConnectorPageSourceProvider.class).to(CarbondataPageSourceProvider.class)
         .in(Scopes.SINGLETON);
-    binder.bind(CarbondataHandleResolver.class).in(Scopes.SINGLETON);
-    configBinder(binder).bindConfig(CarbonTableConfig.class);
-  }
+    binder.bind(ConnectorPageSinkProvider.class).to(HivePageSinkProvider.class)
+        .in(Scopes.SINGLETON);
+    binder.bind(ConnectorNodePartitioningProvider.class).to(HiveNodePartitioningProvider.class)
+        .in(Scopes.SINGLETON);
+
+    jsonCodecBinder(binder).bindJsonCodec(PartitionUpdate.class);
 
-  public static final class TypeDeserializer extends FromStringDeserializer<Type> {
-    private final TypeManager typeManager;
+    binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
+    newExporter(binder).export(FileFormatDataSourceStats.class)
+        .as(generatedNameOf(FileFormatDataSourceStats.class, connectorId));
 
-    @Inject public TypeDeserializer(TypeManager typeManager) {
-      super(Type.class);
-      this.typeManager = requireNonNull(typeManager, "typeManager is null");
-    }
+    Multibinder<HivePageSourceFactory> pageSourceFactoryBinder =
+        newSetBinder(binder, HivePageSourceFactory.class);
+    pageSourceFactoryBinder.addBinding().to(OrcPageSourceFactory.class).in(Scopes.SINGLETON);
+    pageSourceFactoryBinder.addBinding().to(DwrfPageSourceFactory.class).in(Scopes.SINGLETON);
+    pageSourceFactoryBinder.addBinding().to(ParquetPageSourceFactory.class).in(Scopes.SINGLETON);
+    pageSourceFactoryBinder.addBinding().to(RcFilePageSourceFactory.class).in(Scopes.SINGLETON);
 
-    @Override protected Type _deserialize(String value, DeserializationContext context) {
-      Type type = typeManager.getType(parseTypeSignature(value));
-      checkArgument(type != null, "Unknown type %s", value);
-      return type;
-    }
+    Multibinder<HiveFileWriterFactory> fileWriterFactoryBinder =
+        newSetBinder(binder, HiveFileWriterFactory.class);
+    binder.bind(OrcFileWriterFactory.class).in(Scopes.SINGLETON);
+    newExporter(binder).export(OrcFileWriterFactory.class)
+        .as(generatedNameOf(OrcFileWriterFactory.class, connectorId));
+    configBinder(binder).bindConfig(OrcFileWriterConfig.class);
+    fileWriterFactoryBinder.addBinding().to(OrcFileWriterFactory.class).in(Scopes.SINGLETON);
+    fileWriterFactoryBinder.addBinding().to(RcFileFileWriterFactory.class).in(Scopes.SINGLETON);
+    binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index 96024e4..d7b7266 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.presto;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 
 import static java.util.Objects.requireNonNull;
 
@@ -28,7 +29,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
-import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
@@ -43,63 +43,79 @@ import org.apache.carbondata.presto.impl.CarbonTableReader;
 
 import static org.apache.carbondata.presto.Types.checkType;
 
+import com.facebook.presto.hive.HdfsEnvironment;
+import com.facebook.presto.hive.HiveClientConfig;
+import com.facebook.presto.hive.HiveColumnHandle;
+import com.facebook.presto.hive.HivePageSourceFactory;
+import com.facebook.presto.hive.HivePageSourceProvider;
+import com.facebook.presto.hive.HiveRecordCursorProvider;
+import com.facebook.presto.hive.HiveSplit;
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorPageSource;
 import com.facebook.presto.spi.ConnectorSession;
 import com.facebook.presto.spi.ConnectorSplit;
-import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.SchemaTableName;
 import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.type.TypeManager;
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-
 /**
  * Provider Class for Carbondata Page Source class.
  */
-public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider {
+public class CarbondataPageSourceProvider extends HivePageSourceProvider {
 
-  private String connectorId;
   private CarbonTableReader carbonTableReader;
   private String queryId ;
-
-  @Inject public CarbondataPageSourceProvider(CarbondataConnectorId connectorId,
+  private HdfsEnvironment hdfsEnvironment;
+
+  @Inject public CarbondataPageSourceProvider(
+      HiveClientConfig hiveClientConfig,
+      HdfsEnvironment hdfsEnvironment,
+      Set<HiveRecordCursorProvider> cursorProviders,
+      Set<HivePageSourceFactory> pageSourceFactories,
+      TypeManager typeManager,
       CarbonTableReader carbonTableReader) {
-    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+    super(hiveClientConfig, hdfsEnvironment, cursorProviders, pageSourceFactories, typeManager);
     this.carbonTableReader = requireNonNull(carbonTableReader, "carbonTableReader is null");
+    this.hdfsEnvironment = hdfsEnvironment;
   }
 
   @Override
   public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle,
       ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) {
-    this.queryId = ((CarbondataSplit)split).getQueryId();
+    HiveSplit carbonSplit =
+        checkType(split, HiveSplit.class, "split is not class HiveSplit");
+    this.queryId = carbonSplit.getSchema().getProperty("queryId");
+    if (this.queryId == null) {
+      // Fall back to hive pagesource.
+      return super.createPageSource(transactionHandle, session, split, columns);
+    }
+    Configuration configuration = this.hdfsEnvironment.getConfiguration(
+        new HdfsEnvironment.HdfsContext(session, carbonSplit.getDatabase(), carbonSplit.getTable()),
+        new Path(carbonSplit.getSchema().getProperty("tablePath")));
+    configuration = carbonTableReader.updateS3Properties(configuration);
     CarbonDictionaryDecodeReadSupport readSupport = new CarbonDictionaryDecodeReadSupport();
     PrestoCarbonVectorizedRecordReader carbonRecordReader =
-        createReader(split, columns, readSupport);
+        createReader(carbonSplit, columns, readSupport, configuration);
     return new CarbondataPageSource(carbonRecordReader, columns);
   }
 
   /**
-   * @param split
-   * @param columns
-   * @param readSupport
-   * @return
+   * Create vector reader using the split.
    */
-  private PrestoCarbonVectorizedRecordReader createReader(ConnectorSplit split,
-      List<? extends ColumnHandle> columns, CarbonDictionaryDecodeReadSupport readSupport) {
-
-    CarbondataSplit carbondataSplit =
-        checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
-    checkArgument(carbondataSplit.getConnectorId().equals(connectorId),
-        "split is not for this connector");
-    QueryModel queryModel = createQueryModel(carbondataSplit, columns);
+  private PrestoCarbonVectorizedRecordReader createReader(HiveSplit carbonSplit,
+      List<? extends ColumnHandle> columns, CarbonDictionaryDecodeReadSupport readSupport,
+      Configuration conf) {
+    QueryModel queryModel = createQueryModel(carbonSplit, columns, conf);
     if (carbonTableReader.config.getPushRowFilter() == null ||
         carbonTableReader.config.getPushRowFilter().equalsIgnoreCase("false")) {
       queryModel.setDirectVectorFill(true);
@@ -113,14 +129,10 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
       PrestoCarbonVectorizedRecordReader reader =
           new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel,
               (AbstractDetailQueryResultIterator) iterator, readSupport);
-      reader.setTaskId(carbondataSplit.getIndex());
+      reader.setTaskId(Long.parseLong(carbonSplit.getSchema().getProperty("index")));
       return reader;
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to get the Query Model ", e);
-    } catch (QueryExecutionException e) {
-      throw new RuntimeException(e.getMessage(), e);
-    } catch (Exception ex) {
-      throw new RuntimeException(ex.getMessage(), ex);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create reader ", e);
     }
   }
 
@@ -129,30 +141,27 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
    * @param columns
    * @return
    */
-  private QueryModel createQueryModel(CarbondataSplit carbondataSplit,
-      List<? extends ColumnHandle> columns) {
+  private QueryModel createQueryModel(HiveSplit carbondataSplit,
+      List<? extends ColumnHandle> columns, Configuration conf) {
 
     try {
       CarbonProjection carbonProjection = getCarbonProjection(columns);
-      CarbonTable carbonTable = getCarbonTable(carbondataSplit);
-
-      Configuration conf = new Configuration();
+      CarbonTable carbonTable = getCarbonTable(carbondataSplit, conf);
       conf.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
       String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
       CarbonTableInputFormat
           .setTransactionalTable(conf, carbonTable.getTableInfo().isTransactionalTable());
       CarbonTableInputFormat.setTableInfo(conf, carbonTable.getTableInfo());
-
       conf.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
       conf.set("query.id", queryId);
       JobConf jobConf = new JobConf(conf);
       CarbonTableInputFormat carbonTableInputFormat = createInputFormat(jobConf, carbonTable,
-          PrestoFilterUtil.parseFilterExpression(carbondataSplit.getConstraints()),
+          PrestoFilterUtil.parseFilterExpression(carbondataSplit.getEffectivePredicate()),
           carbonProjection);
       TaskAttemptContextImpl hadoopAttemptContext =
           new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
-      CarbonMultiBlockSplit carbonInputSplit =
-          CarbonLocalMultiBlockSplit.convertSplit(carbondataSplit.getLocalInputSplit());
+      CarbonMultiBlockSplit carbonInputSplit = CarbonLocalMultiBlockSplit
+          .convertSplit(carbondataSplit.getSchema().getProperty("carbonSplit"));
       QueryModel queryModel =
           carbonTableInputFormat.createQueryModel(carbonInputSplit, hadoopAttemptContext);
       queryModel.setQueryId(queryId);
@@ -204,10 +213,10 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
   private CarbonProjection getCarbonProjection(List<? extends ColumnHandle> columns) {
     CarbonProjection carbonProjection = new CarbonProjection();
     // Convert all columns handles
-    ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder();
+    ImmutableList.Builder<HiveColumnHandle> handles = ImmutableList.builder();
     for (ColumnHandle handle : columns) {
-      handles.add(checkType(handle, CarbondataColumnHandle.class, "handle"));
-      carbonProjection.addColumn(((CarbondataColumnHandle) handle).getColumnName());
+      handles.add(checkType(handle, HiveColumnHandle.class, "handle"));
+      carbonProjection.addColumn(((HiveColumnHandle) handle).getName());
     }
     return carbonProjection;
   }
@@ -216,9 +225,10 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
    * @param carbonSplit
    * @return
    */
-  private CarbonTable getCarbonTable(CarbondataSplit carbonSplit) {
-    CarbonTableCacheModel tableCacheModel =
-        carbonTableReader.getCarbonCache(carbonSplit.getSchemaTableName());
+  private CarbonTable getCarbonTable(HiveSplit carbonSplit, Configuration configuration) {
+    CarbonTableCacheModel tableCacheModel = carbonTableReader
+        .getCarbonCache(new SchemaTableName(carbonSplit.getDatabase(), carbonSplit.getTable()),
+            carbonSplit.getSchema().getProperty("tablePath"), configuration);
     checkNotNull(tableCacheModel, "tableCacheModel should not be null");
     checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
     checkNotNull(tableCacheModel.carbonTable.getTableInfo(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
deleted file mode 100755
index 86e9161..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-import java.util.List;
-
-import static java.util.Objects.requireNonNull;
-
-import org.apache.carbondata.presto.impl.CarbonLocalMultiBlockSplit;
-
-import com.facebook.presto.spi.ColumnHandle;
-import com.facebook.presto.spi.ConnectorSplit;
-import com.facebook.presto.spi.HostAddress;
-import com.facebook.presto.spi.SchemaTableName;
-import com.facebook.presto.spi.predicate.TupleDomain;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableList;
-
-public class CarbondataSplit implements ConnectorSplit {
-
-  private final String connectorId;
-  private final SchemaTableName schemaTableName;
-  private final TupleDomain<ColumnHandle> constraints;
-  private final CarbonLocalMultiBlockSplit localInputSplit;
-  private final List<CarbondataColumnConstraint> rebuildConstraints;
-  private final ImmutableList<HostAddress> addresses;
-  private final String queryId;
-  private final long index;
-
-  @JsonCreator public CarbondataSplit(@JsonProperty("connectorId") String connectorId,
-      @JsonProperty("schemaTableName") SchemaTableName schemaTableName,
-      @JsonProperty("constraints") TupleDomain<ColumnHandle> constraints,
-      @JsonProperty("localInputSplit") CarbonLocalMultiBlockSplit localInputSplit,
-      @JsonProperty("rebuildConstraints") List<CarbondataColumnConstraint> rebuildConstraints,
-      @JsonProperty("queryId") String queryId,
-      @JsonProperty("index") long index) {
-    this.connectorId = requireNonNull(connectorId, "connectorId is null");
-    this.schemaTableName = requireNonNull(schemaTableName, "schemaTable is null");
-    this.constraints = requireNonNull(constraints, "constraints is null");
-    this.localInputSplit = requireNonNull(localInputSplit, "localInputSplit is null");
-    this.rebuildConstraints = requireNonNull(rebuildConstraints, "rebuildConstraints is null");
-    this.addresses = ImmutableList.of();
-    this.queryId = queryId;
-    this.index = index;
-  }
-
-  @JsonProperty public String getConnectorId() {
-    return connectorId;
-  }
-
-  @JsonProperty public SchemaTableName getSchemaTableName() {
-    return schemaTableName;
-  }
-
-  @JsonProperty public TupleDomain<ColumnHandle> getConstraints() {
-    return constraints;
-  }
-
-  @JsonProperty public CarbonLocalMultiBlockSplit getLocalInputSplit() {
-    return localInputSplit;
-  }
-
-  @JsonProperty public List<CarbondataColumnConstraint> getRebuildConstraints() {
-    return rebuildConstraints;
-  }
-
-  @Override public boolean isRemotelyAccessible() {
-    return true;
-  }
-
-  @Override public List<HostAddress> getAddresses() {
-    return addresses;
-  }
-
-  @Override public Object getInfo() {
-    return this;
-  }
-
-  @JsonProperty public String getQueryId() {
-    return queryId;
-  }
-
-  @JsonProperty public long getIndex() {
-    return index;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
index eeb16f7..ded00fc 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -17,8 +17,9 @@
 
 package org.apache.carbondata.presto;
 
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
 
 import javax.inject.Inject;
 
@@ -33,42 +34,74 @@ import org.apache.carbondata.presto.impl.CarbonLocalMultiBlockSplit;
 import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
 import org.apache.carbondata.presto.impl.CarbonTableReader;
 
-import static org.apache.carbondata.presto.Types.checkType;
-
-import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.hive.CoercionPolicy;
+import com.facebook.presto.hive.DirectoryLister;
+import com.facebook.presto.hive.ForHiveClient;
+import com.facebook.presto.hive.HdfsEnvironment;
+import com.facebook.presto.hive.HiveClientConfig;
+import com.facebook.presto.hive.HiveColumnHandle;
+import com.facebook.presto.hive.HiveSplit;
+import com.facebook.presto.hive.HiveSplitManager;
+import com.facebook.presto.hive.HiveTableLayoutHandle;
+import com.facebook.presto.hive.HiveTransactionHandle;
+import com.facebook.presto.hive.NamenodeStats;
+import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
+import com.facebook.presto.hive.metastore.Table;
 import com.facebook.presto.spi.ConnectorSession;
 import com.facebook.presto.spi.ConnectorSplit;
 import com.facebook.presto.spi.ConnectorSplitSource;
 import com.facebook.presto.spi.ConnectorTableLayoutHandle;
 import com.facebook.presto.spi.FixedSplitSource;
+import com.facebook.presto.spi.HostAddress;
 import com.facebook.presto.spi.SchemaTableName;
-import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.TableNotFoundException;
 import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
 import com.facebook.presto.spi.predicate.TupleDomain;
 import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
+import static com.google.common.collect.ImmutableList.toImmutableList;
 
 /**
  * Build Carbontable splits
  * filtering irrelevant blocks
  */
-public class CarbondataSplitManager implements ConnectorSplitManager {
+public class CarbondataSplitManager extends HiveSplitManager {
 
-  private final String connectorId;
   private final CarbonTableReader carbonTableReader;
-
-  @Inject
-  public CarbondataSplitManager(CarbondataConnectorId connectorId, CarbonTableReader reader) {
-    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+  private final Function<HiveTransactionHandle, SemiTransactionalHiveMetastore> metastoreProvider;
+  private final HdfsEnvironment hdfsEnvironment;
+
+  @Inject public CarbondataSplitManager(HiveClientConfig hiveClientConfig,
+      Function<HiveTransactionHandle, SemiTransactionalHiveMetastore> metastoreProvider,
+      NamenodeStats namenodeStats, HdfsEnvironment hdfsEnvironment, DirectoryLister directoryLister,
+      @ForHiveClient ExecutorService executorService, CoercionPolicy coercionPolicy,
+      CarbonTableReader reader) {
+    super(hiveClientConfig, metastoreProvider, namenodeStats, hdfsEnvironment, directoryLister,
+        executorService, coercionPolicy);
     this.carbonTableReader = requireNonNull(reader, "client is null");
+    this.metastoreProvider = requireNonNull(metastoreProvider, "metastore is null");
+    this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
   }
 
   public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle,
-      ConnectorSession session, ConnectorTableLayoutHandle layout,
+      ConnectorSession session, ConnectorTableLayoutHandle layoutHandle,
       SplitSchedulingStrategy splitSchedulingStrategy) {
-    CarbondataTableLayoutHandle layoutHandle = (CarbondataTableLayoutHandle) layout;
-    CarbondataTableHandle tableHandle = layoutHandle.getTable();
-    SchemaTableName key = tableHandle.getSchemaTableName();
+
+    HiveTableLayoutHandle layout = (HiveTableLayoutHandle) layoutHandle;
+    SchemaTableName schemaTableName = layout.getSchemaTableName();
+
+    // get table metadata
+    SemiTransactionalHiveMetastore metastore =
+        metastoreProvider.apply((HiveTransactionHandle) transactionHandle);
+    Table table =
+        metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName())
+            .orElseThrow(() -> new TableNotFoundException(schemaTableName));
+    if (!table.getStorage().getStorageFormat().getInputFormat().contains("carbon")) {
+      return super.getSplits(transactionHandle, session, layoutHandle, splitSchedulingStrategy);
+    }
+    String location = table.getStorage().getLocation();
 
     String queryId = System.nanoTime() + "";
     QueryStatistic statistic = new QueryStatistic();
@@ -78,24 +111,39 @@ public class CarbondataSplitManager implements ConnectorSplitManager {
     statistic = new QueryStatistic();
 
     carbonTableReader.setQueryId(queryId);
-    // Packaging presto-TupleDomain into CarbondataColumnConstraint,
-    // to decouple from presto-spi Module
-    List<CarbondataColumnConstraint> rebuildConstraints =
-        getColumnConstraints(layoutHandle.getConstraint());
-
-    CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key);
+    TupleDomain<HiveColumnHandle> predicate =
+        (TupleDomain<HiveColumnHandle>) layout.getCompactEffectivePredicate();
+    Configuration configuration = this.hdfsEnvironment.getConfiguration(
+        new HdfsEnvironment.HdfsContext(session, schemaTableName.getSchemaName(),
+            schemaTableName.getTableName()), new Path(location));
+    configuration = carbonTableReader.updateS3Properties(configuration);
+    CarbonTableCacheModel cache =
+        carbonTableReader.getCarbonCache(schemaTableName, location, configuration);
     if (null != cache) {
-      Expression filters = PrestoFilterUtil.parseFilterExpression(layoutHandle.getConstraint());
+      Expression filters = PrestoFilterUtil.parseFilterExpression(predicate);
       try {
+
         List<CarbonLocalMultiBlockSplit> splits =
-            carbonTableReader.getInputSplits2(cache, filters, layoutHandle.getConstraint());
+            carbonTableReader.getInputSplits2(cache, filters, predicate, configuration);
 
         ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
         long index = 0;
         for (CarbonLocalMultiBlockSplit split : splits) {
           index++;
-          cSplits.add(new CarbondataSplit(connectorId, tableHandle.getSchemaTableName(),
-              layoutHandle.getConstraint(), split, rebuildConstraints, queryId, index));
+          Properties properties = new Properties();
+          for (Map.Entry<String, String> entry : table.getStorage().getSerdeParameters()
+              .entrySet()) {
+            properties.setProperty(entry.getKey(), entry.getValue());
+          }
+          properties.setProperty("tablePath", cache.carbonTable.getTablePath());
+          properties.setProperty("carbonSplit", split.getJsonString());
+          properties.setProperty("queryId", queryId);
+          properties.setProperty("index", String.valueOf(index));
+          cSplits.add(
+              new HiveSplit(schemaTableName.getSchemaName(), schemaTableName.getTableName(),
+                  schemaTableName.getTableName(), "", 0, 0, 0, properties, new ArrayList(),
+                  getHostAddresses(split.getLocations()), OptionalInt.empty(), false, predicate,
+                  new HashMap<>(), Optional.empty()));
         }
 
         statisticRecorder.logStatisticsAsTableDriver();
@@ -112,24 +160,8 @@ public class CarbondataSplitManager implements ConnectorSplitManager {
     return null;
   }
 
-  /**
-   *
-   * @param constraint
-   * @return
-   */
-  public List<CarbondataColumnConstraint> getColumnConstraints(
-      TupleDomain<ColumnHandle> constraint) {
-    ImmutableList.Builder<CarbondataColumnConstraint> constraintBuilder = ImmutableList.builder();
-    for (TupleDomain.ColumnDomain<ColumnHandle> columnDomain : constraint.getColumnDomains()
-        .get()) {
-      CarbondataColumnHandle columnHandle =
-          checkType(columnDomain.getColumn(), CarbondataColumnHandle.class, "column handle");
-
-      constraintBuilder.add(new CarbondataColumnConstraint(columnHandle.getColumnName(),
-          Optional.of(columnDomain.getDomain()), columnHandle.isInvertedIndex()));
-    }
-
-    return constraintBuilder.build();
+  private static List<HostAddress> getHostAddresses(String[] hosts) {
+    return Arrays.stream(hosts).map(HostAddress::fromString).collect(toImmutableList());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java
deleted file mode 100755
index 6dbd92f..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.util.Objects;
-
-import static java.util.Locale.ENGLISH;
-import static java.util.Objects.requireNonNull;
-
-import com.facebook.presto.spi.ConnectorTableHandle;
-import com.facebook.presto.spi.SchemaTableName;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Joiner;
-
-public class CarbondataTableHandle implements ConnectorTableHandle {
-
-  private final String connectorId;
-  private final SchemaTableName schemaTableName;
-
-  @JsonCreator public CarbondataTableHandle(@JsonProperty("connectorId") String connectorId,
-      @JsonProperty("schemaTableName") SchemaTableName schemaTableName) {
-    this.connectorId = requireNonNull(connectorId.toLowerCase(ENGLISH), "connectorId is null");
-    this.schemaTableName = schemaTableName;
-  }
-
-  @JsonProperty public String getConnectorId() {
-    return connectorId;
-  }
-
-  @JsonProperty public SchemaTableName getSchemaTableName() {
-    return schemaTableName;
-  }
-
-  @Override public int hashCode() {
-    return Objects.hash(connectorId, schemaTableName);
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if ((obj == null) || (getClass() != obj.getClass())) {
-      return false;
-    }
-
-    CarbondataTableHandle other = (CarbondataTableHandle) obj;
-    return Objects.equals(this.connectorId, other.connectorId) && this.schemaTableName
-        .equals(other.getSchemaTableName());
-  }
-
-  @Override public String toString() {
-    return Joiner.on(":").join(connectorId, schemaTableName.toString());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java
deleted file mode 100755
index 5c50ad5..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.util.Objects;
-
-import static java.util.Objects.requireNonNull;
-
-import com.facebook.presto.spi.ColumnHandle;
-import com.facebook.presto.spi.ConnectorTableLayoutHandle;
-import com.facebook.presto.spi.predicate.TupleDomain;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-public class CarbondataTableLayoutHandle implements ConnectorTableLayoutHandle {
-  private final CarbondataTableHandle table;
-  private final TupleDomain<ColumnHandle> constraint;
-
-  @JsonCreator
-  public CarbondataTableLayoutHandle(@JsonProperty("table") CarbondataTableHandle table,
-      @JsonProperty("constraint") TupleDomain<ColumnHandle> constraint) {
-    this.table = requireNonNull(table, "table is null");
-    this.constraint = requireNonNull(constraint, "constraint is null");
-  }
-
-  @JsonProperty public CarbondataTableHandle getTable() {
-    return table;
-  }
-
-  @JsonProperty public TupleDomain<ColumnHandle> getConstraint() {
-    return constraint;
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-
-    if (obj == null || getClass() != obj.getClass()) {
-      return false;
-    }
-
-    CarbondataTableLayoutHandle other = (CarbondataTableLayoutHandle) obj;
-    return Objects.equals(table, other.table) && Objects.equals(constraint, other.constraint);
-  }
-
-  @Override public int hashCode() {
-    return Objects.hash(table, constraint);
-  }
-
-  @Override public String toString() {
-    return toStringHelper(this).add("table", table).add("constraint", constraint).toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java
deleted file mode 100755
index 40f8692..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.util.Objects;
-import java.util.UUID;
-
-import static java.util.Objects.requireNonNull;
-
-import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-public class CarbondataTransactionHandle implements ConnectorTransactionHandle {
-  private final UUID uuid;
-
-  public CarbondataTransactionHandle() {
-    this(UUID.randomUUID());
-  }
-
-  @JsonCreator public CarbondataTransactionHandle(@JsonProperty("uuid") UUID uuid) {
-    this.uuid = requireNonNull(uuid, "uuid is null");
-  }
-
-  @JsonProperty public UUID getUuid() {
-    return uuid;
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if ((obj == null) || (getClass() != obj.getClass())) {
-      return false;
-    }
-
-    return Objects.equals(uuid, ((CarbondataTransactionHandle) obj).uuid);
-  }
-
-  @Override public int hashCode() {
-    return Objects.hash(uuid);
-  }
-
-  @Override public String toString() {
-    return toStringHelper(this).add("uuid", uuid).toString();
-  }
-
-}