You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/12/29 03:32:44 UTC

[2/2] carbondata git commit: [CARBONDATA-3194] Integrating Carbon with Presto using hive connector

[CARBONDATA-3194] Integrating Carbon with Presto using hive connector

This PR extend the CarbondataConnectorFactory with HiveConnectorFactory, so that all features of HIve presto connector will be inherited to carbon as well.
It simplifies the integration so removed lot of old code.

This closes #3019


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e193df0a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e193df0a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e193df0a

Branch: refs/heads/master
Commit: e193df0a1db6237babd4c726c0ea32f4ebd5a74e
Parents: 7c4e79f
Author: ravipesala <ra...@gmail.com>
Authored: Wed Dec 19 21:19:41 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Dec 29 11:32:18 2018 +0800

----------------------------------------------------------------------
 .../core/datastore/impl/FileFactory.java        |  23 ++
 integration/presto/pom.xml                      |  19 ++
 .../presto/CarbondataColumnHandle.java          | 143 ----------
 .../carbondata/presto/CarbondataConnector.java  |  78 ------
 .../presto/CarbondataConnectorFactory.java      | 157 +++++++++--
 .../presto/CarbondataConnectorId.java           |  52 ----
 .../carbondata/presto/CarbondataErrorCode.java  |  38 ---
 .../presto/CarbondataHandleResolver.java        |  43 ---
 .../carbondata/presto/CarbondataMetadata.java   | 272 -------------------
 .../carbondata/presto/CarbondataModule.java     | 153 ++++++++---
 .../presto/CarbondataPageSourceProvider.java    | 100 ++++---
 .../carbondata/presto/CarbondataSplit.java      | 102 -------
 .../presto/CarbondataSplitManager.java          | 120 +++++---
 .../presto/CarbondataTableHandle.java           |  71 -----
 .../presto/CarbondataTableLayoutHandle.java     |  71 -----
 .../presto/CarbondataTransactionHandle.java     |  65 -----
 .../carbondata/presto/PrestoFilterUtil.java     | 126 ++++-----
 .../presto/impl/CarbonLocalMultiBlockSplit.java |  12 +-
 .../presto/impl/CarbonTableConfig.java          |  35 ---
 .../presto/impl/CarbonTableReader.java          | 226 ++-------------
 .../PrestoAllDataTypeLocalDictTest.scala        |  16 +-
 .../integrationtest/PrestoAllDataTypeTest.scala |  14 +-
 .../PrestoTestNonTransactionalTableFiles.scala  |  23 +-
 .../carbondata/presto/server/PrestoServer.scala |  42 ++-
 .../presto/util/CarbonDataStoreCreator.scala    |   2 -
 .../TestCreateHiveTableWithCarbonDS.scala       |  13 +-
 .../spark/util/CarbonReflectionUtils.scala      |  35 ++-
 .../table/CarbonCreateTableCommand.scala        |   9 +-
 28 files changed, 638 insertions(+), 1422 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
index a732559..25a59f8 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/impl/FileFactory.java
@@ -369,6 +369,29 @@ public final class FileFactory {
   }
 
   /**
+   * Adds the schema to file path if not exists to the file path.
+   * @param filePath path of file
+   * @return Updated filepath
+   */
+  public static String addSchemeIfNotExists(String filePath) {
+    FileType fileType = getFileType(filePath);
+    switch (fileType) {
+      case LOCAL:
+        if (filePath.startsWith("file:")) {
+          return filePath;
+        } else {
+          return new Path("file://" + filePath).toString();
+        }
+      case HDFS:
+      case ALLUXIO:
+      case VIEWFS:
+      case S3:
+      default:
+        return filePath;
+    }
+  }
+
+  /**
    * below method will be used to update the file path
    * for local type
    * it removes the file:/ from the path

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
index 9fc1ace..32b0ca7 100644
--- a/integration/presto/pom.xml
+++ b/integration/presto/pom.xml
@@ -409,6 +409,25 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>com.facebook.presto</groupId>
+      <artifactId>presto-hive</artifactId>
+      <version>${presto.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.openjdk.jol</groupId>
+          <artifactId>jol-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.antlr</groupId>
+          <artifactId>antlr4-runtime</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-compress</artifactId>
       <version>1.4.1</version>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
deleted file mode 100755
index 7152bb4..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.util.Objects;
-
-import static java.util.Objects.requireNonNull;
-
-import com.facebook.presto.spi.ColumnHandle;
-import com.facebook.presto.spi.ColumnMetadata;
-import com.facebook.presto.spi.type.Type;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-
-public class CarbondataColumnHandle implements ColumnHandle {
-  private final String connectorId;
-  private final String columnName;
-
-  public boolean isInvertedIndex() {
-    return isInvertedIndex;
-  }
-
-  private final Type columnType;
-  private final int ordinalPosition;
-  private final int keyOrdinal;
-
-  private final String columnUniqueId;
-  private final boolean isInvertedIndex;
-
-  /**
-   * Used when this column contains decimal data.
-   */
-  private int scale;
-
-  private int precision;
-
-
-  public boolean isMeasure() {
-    return isMeasure;
-  }
-
-  private final boolean isMeasure;
-
-  public int getKeyOrdinal() {
-    return keyOrdinal;
-  }
-
-  @JsonCreator public CarbondataColumnHandle(
-      @JsonProperty("connectorId") String connectorId,
-      @JsonProperty("columnName") String columnName,
-      @JsonProperty("columnType") Type columnType,
-      @JsonProperty("ordinalPosition") int ordinalPosition,
-      @JsonProperty("keyOrdinal") int keyOrdinal,
-      @JsonProperty("isMeasure") boolean isMeasure,
-      @JsonProperty("columnUniqueId") String columnUniqueId,
-      @JsonProperty("isInvertedIndex") boolean isInvertedIndex,
-      @JsonProperty("precision") int precision,
-      @JsonProperty("scale") int scale) {
-    this.connectorId = requireNonNull(connectorId, "connectorId is null");
-    this.columnName = requireNonNull(columnName, "columnName is null");
-    this.columnType = requireNonNull(columnType, "columnType is null");
-
-    this.ordinalPosition = requireNonNull(ordinalPosition, "ordinalPosition is null");
-    this.keyOrdinal = requireNonNull(keyOrdinal, "keyOrdinal is null");
-
-    this.isMeasure = isMeasure;
-    this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
-    this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
-    this.precision = precision;
-    this.scale = scale;
-  }
-
-  @JsonProperty public String getConnectorId() {
-    return connectorId;
-  }
-
-  @JsonProperty public String getColumnName() {
-    return columnName;
-  }
-
-  @JsonProperty public Type getColumnType() {
-    return columnType;
-  }
-
-  @JsonProperty public int getOrdinalPosition() {
-    return ordinalPosition;
-  }
-
-  public ColumnMetadata getColumnMetadata() {
-    return new ColumnMetadata(columnName, columnType, null, false);
-  }
-
-  @Override public int hashCode() {
-    return Objects.hash(connectorId, columnName);
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if ((obj == null) || (getClass() != obj.getClass())) {
-      return false;
-    }
-
-    CarbondataColumnHandle other = (CarbondataColumnHandle) obj;
-    return Objects.equals(this.connectorId, other.connectorId) && Objects
-        .equals(this.columnName, other.columnName);
-  }
-
-  @Override public String toString() {
-    return toStringHelper(this).add("connectorId", connectorId).add("columnName", columnName)
-        .add("columnType", columnType).add("ordinalPosition", ordinalPosition).toString();
-  }
-
-  @JsonProperty public int getScale() {
-    return scale;
-  }
-
-  @JsonProperty public int getPrecision() {
-    return precision;
-  }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
deleted file mode 100755
index ab56f8d..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import static java.util.Objects.requireNonNull;
-
-import com.facebook.presto.spi.connector.*;
-import com.facebook.presto.spi.transaction.IsolationLevel;
-import io.airlift.bootstrap.LifeCycleManager;
-import io.airlift.log.Logger;
-
-import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
-import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
-
-public class CarbondataConnector implements Connector {
-
-  private static final Logger log = Logger.get(CarbondataConnector.class);
-
-  private final LifeCycleManager lifeCycleManager;
-  private final ConnectorMetadata metadata;
-  private final ConnectorSplitManager splitManager;
-  private final ClassLoader classLoader;
-  private final ConnectorPageSourceProvider pageSourceProvider;
-
-  public CarbondataConnector(LifeCycleManager lifeCycleManager, ConnectorMetadata metadata,
-      ConnectorSplitManager splitManager,
-      ClassLoader classLoader, ConnectorPageSourceProvider pageSourceProvider) {
-    this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
-    this.metadata = requireNonNull(metadata, "metadata is null");
-    this.splitManager = requireNonNull(splitManager, "splitManager is null");
-    this.classLoader = requireNonNull(classLoader, "classLoader is null");
-    this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
-  }
-
-  @Override public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel,
-      boolean readOnly) {
-    checkConnectorSupports(READ_COMMITTED, isolationLevel);
-    return new CarbondataTransactionHandle();
-  }
-
-  @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) {
-    return metadata;
-  }
-
-  @Override public ConnectorSplitManager getSplitManager() {
-    return splitManager;
-  }
-
-
-  @Override
-  public ConnectorPageSourceProvider getPageSourceProvider()
-  {
-    return pageSourceProvider;
-  }
-
-  @Override public final void shutdown() {
-    try {
-      lifeCycleManager.stop();
-    } catch (Exception e) {
-      log.error(e, "Error shutting down connector");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index 6bd52e6..1dd5176 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -17,69 +17,178 @@
 
 package org.apache.carbondata.presto;
 
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.*;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 
 import static java.util.Objects.requireNonNull;
 
-import com.facebook.presto.spi.ConnectorHandleResolver;
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat;
+import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+
+import com.facebook.presto.hive.HiveConnector;
+import com.facebook.presto.hive.HiveConnectorFactory;
+import com.facebook.presto.hive.HiveMetadataFactory;
+import com.facebook.presto.hive.HiveProcedureModule;
+import com.facebook.presto.hive.HiveSchemaProperties;
+import com.facebook.presto.hive.HiveSessionProperties;
+import com.facebook.presto.hive.HiveStorageFormat;
+import com.facebook.presto.hive.HiveTableProperties;
+import com.facebook.presto.hive.HiveTransactionManager;
+import com.facebook.presto.hive.NodeVersion;
+import com.facebook.presto.hive.RebindSafeMBeanServer;
+import com.facebook.presto.hive.authentication.HiveAuthenticationModule;
+import com.facebook.presto.hive.metastore.HiveMetastoreModule;
+import com.facebook.presto.hive.s3.HiveS3Module;
+import com.facebook.presto.hive.security.HiveSecurityModule;
+import com.facebook.presto.hive.security.PartitionsAwareAccessControl;
+import com.facebook.presto.spi.NodeManager;
+import com.facebook.presto.spi.PageIndexerFactory;
+import com.facebook.presto.spi.PageSorter;
 import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
-import com.facebook.presto.spi.connector.*;
-import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorAccessControl;
+import com.facebook.presto.spi.connector.ConnectorContext;
+import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
+import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSinkProvider;
 import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
 import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
-import com.google.common.base.Throwables;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeNodePartitioningProvider;
+import com.facebook.presto.spi.procedure.Procedure;
+import com.facebook.presto.spi.type.TypeManager;
+import com.google.common.collect.ImmutableSet;
 import com.google.inject.Injector;
+import com.google.inject.Key;
+import com.google.inject.TypeLiteral;
 import io.airlift.bootstrap.Bootstrap;
 import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.event.client.EventModule;
 import io.airlift.json.JsonModule;
+import io.airlift.units.DataSize;
+import org.weakref.jmx.guice.MBeanModule;
+import sun.reflect.ConstructorAccessor;
 
+import static com.google.common.base.Throwables.throwIfUnchecked;
+import static io.airlift.configuration.ConfigBinder.configBinder;
 
 /**
  * Build Carbondata Connector
  * It will be called by CarbondataPlugin
  */
-public class CarbondataConnectorFactory implements ConnectorFactory {
+public class CarbondataConnectorFactory extends HiveConnectorFactory {
 
-  private final String name;
   private final ClassLoader classLoader;
 
   public CarbondataConnectorFactory(String connectorName, ClassLoader classLoader) {
-    this.name = connectorName;
+    super(connectorName, classLoader, null);
     this.classLoader = requireNonNull(classLoader, "classLoader is null");
   }
 
-  @Override public String getName() {
-    return name;
-  }
-
-  @Override public ConnectorHandleResolver getHandleResolver() {
-    return new CarbondataHandleResolver();
-  }
-
-  @Override public Connector create(String connectorId, Map<String, String> config,
+  @Override public Connector create(String catalogName, Map<String, String> config,
       ConnectorContext context) {
     requireNonNull(config, "config is null");
 
     try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
-      Bootstrap app = new Bootstrap(new JsonModule(),
-          new CarbondataModule(connectorId, context.getTypeManager()));
+      Bootstrap app = new Bootstrap(new EventModule(), new MBeanModule(), new JsonModule(),
+          new CarbondataModule(catalogName), new HiveS3Module(catalogName),
+          new HiveMetastoreModule(catalogName, Optional.ofNullable(null)), new HiveSecurityModule(),
+          new HiveAuthenticationModule(), new HiveProcedureModule(), binder -> {
+        javax.management.MBeanServer platformMBeanServer =
+            ManagementFactory.getPlatformMBeanServer();
+        binder.bind(javax.management.MBeanServer.class)
+            .toInstance(new RebindSafeMBeanServer(platformMBeanServer));
+        binder.bind(NodeVersion.class)
+            .toInstance(new NodeVersion(context.getNodeManager().getCurrentNode().getVersion()));
+        binder.bind(NodeManager.class).toInstance(context.getNodeManager());
+        binder.bind(TypeManager.class).toInstance(context.getTypeManager());
+        binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory());
+        binder.bind(PageSorter.class).toInstance(context.getPageSorter());
+        configBinder(binder).bindConfig(CarbonTableConfig.class);
+      });
 
       Injector injector =
           app.strictConfig().doNotInitializeLogging().setRequiredConfigurationProperties(config)
               .initialize();
 
+      setCarbonEnum();
+
       LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
-      ConnectorMetadata metadata = injector.getInstance(CarbondataMetadata.class);
+      HiveMetadataFactory metadataFactory = injector.getInstance(HiveMetadataFactory.class);
+      HiveTransactionManager transactionManager =
+          injector.getInstance(HiveTransactionManager.class);
       ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
       ConnectorPageSourceProvider connectorPageSource =
           injector.getInstance(ConnectorPageSourceProvider.class);
+      ConnectorPageSinkProvider pageSinkProvider =
+          injector.getInstance(ConnectorPageSinkProvider.class);
+      ConnectorNodePartitioningProvider connectorDistributionProvider =
+          injector.getInstance(ConnectorNodePartitioningProvider.class);
+      HiveSessionProperties hiveSessionProperties =
+          injector.getInstance(HiveSessionProperties.class);
+      HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
+      ConnectorAccessControl accessControl =
+          new PartitionsAwareAccessControl(injector.getInstance(ConnectorAccessControl.class));
+      Set<Procedure> procedures = injector.getInstance(Key.get(new TypeLiteral<Set<Procedure>>() {
+      }));
 
-      return new CarbondataConnector(lifeCycleManager,
-          new ClassLoaderSafeConnectorMetadata(metadata, classLoader),
-          new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), classLoader,
-          new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader));
+      return new HiveConnector(lifeCycleManager, metadataFactory, transactionManager,
+          new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
+          new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
+          new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
+          new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
+          ImmutableSet.of(), procedures, hiveSessionProperties.getSessionProperties(),
+          HiveSchemaProperties.SCHEMA_PROPERTIES, hiveTableProperties.getTableProperties(),
+          accessControl, classLoader);
     } catch (Exception e) {
-      throw Throwables.propagate(e);
+      throwIfUnchecked(e);
+      throw new RuntimeException(e);
     }
   }
+
+  /**
+   * Set the Carbon format enum to HiveStorageFormat, its a hack but for time being it is best
+   * choice to avoid lot of code change.
+   *
+   * @throws Exception
+   */
+  private void setCarbonEnum() throws Exception {
+    for (HiveStorageFormat format : HiveStorageFormat.values()) {
+      if (format.name().equals("CARBON")) {
+        return;
+      }
+    }
+    Constructor<?>[] declaredConstructors = HiveStorageFormat.class.getDeclaredConstructors();
+    declaredConstructors[0].setAccessible(true);
+    Field constructorAccessorField = Constructor.class.getDeclaredField("constructorAccessor");
+    constructorAccessorField.setAccessible(true);
+    ConstructorAccessor ca =
+        (ConstructorAccessor) constructorAccessorField.get(declaredConstructors[0]);
+    if (ca == null) {
+      Method acquireConstructorAccessorMethod =
+          Constructor.class.getDeclaredMethod("acquireConstructorAccessor");
+      acquireConstructorAccessorMethod.setAccessible(true);
+      ca = (ConstructorAccessor) acquireConstructorAccessorMethod.invoke(declaredConstructors[0]);
+    }
+    Object instance = ca.newInstance(new Object[] { "CARBON", HiveStorageFormat.values().length, "",
+        CarbonTableInputFormat.class.getName(), CarbonTableOutputFormat.class.getName(),
+        new DataSize(256.0D, DataSize.Unit.MEGABYTE) });
+    Field values = HiveStorageFormat.class.getDeclaredField("$VALUES");
+    values.setAccessible(true);
+    Field modifiersField = Field.class.getDeclaredField("modifiers");
+    modifiersField.setAccessible(true);
+    modifiersField.setInt(values, values.getModifiers() & ~Modifier.FINAL);
+
+    HiveStorageFormat[] hiveStorageFormats =
+        new HiveStorageFormat[HiveStorageFormat.values().length + 1];
+    HiveStorageFormat[] src = (HiveStorageFormat[]) values.get(null);
+    System.arraycopy(src, 0, hiveStorageFormats, 0, src.length);
+    hiveStorageFormats[src.length] = (HiveStorageFormat) instance;
+    values.set(null, hiveStorageFormats);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
deleted file mode 100755
index d25e569..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorId.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.util.Objects;
-
-import static java.util.Objects.requireNonNull;
-
-import com.google.inject.Inject;
-
-public class CarbondataConnectorId {
-  private final String id;
-
-  @Inject public CarbondataConnectorId(String id) {
-    this.id = requireNonNull(id, "id is null");
-  }
-
-  @Override public String toString() {
-    return id;
-  }
-
-  @Override public int hashCode() {
-    return Objects.hash(id);
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-
-    if ((obj == null) || (getClass() != obj.getClass())) {
-      return false;
-    }
-
-    return Objects.equals(this.id, ((CarbondataConnectorId) obj).id);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataErrorCode.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataErrorCode.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataErrorCode.java
deleted file mode 100644
index 45971d0..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataErrorCode.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.presto;
-
-import com.facebook.presto.spi.ErrorCode;
-import com.facebook.presto.spi.ErrorCodeSupplier;
-import com.facebook.presto.spi.ErrorType;
-
-import static com.facebook.presto.spi.ErrorType.EXTERNAL;
-import static com.facebook.presto.spi.ErrorType.INTERNAL_ERROR;
-
-public enum CarbondataErrorCode implements ErrorCodeSupplier {
-  CARBON_NOT_SUPPORT_TYPE(1, INTERNAL_ERROR), CARBON_INVALID_TYPE_VALUE(2, EXTERNAL);
-
-  private final ErrorCode errorCode;
-
-  CarbondataErrorCode(int code, ErrorType type) {
-    errorCode = new ErrorCode(code + 0x0100_0000, name(), type);
-  }
-
-  @Override public ErrorCode toErrorCode() {
-    return errorCode;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java
deleted file mode 100755
index 7c65bfd..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataHandleResolver.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import com.facebook.presto.spi.*;
-import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
-
-public class CarbondataHandleResolver implements ConnectorHandleResolver {
-  @Override public Class<? extends ConnectorTableHandle> getTableHandleClass() {
-    return CarbondataTableHandle.class;
-  }
-
-  @Override public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass() {
-    return CarbondataTableLayoutHandle.class;
-  }
-
-  @Override public Class<? extends ColumnHandle> getColumnHandleClass() {
-    return CarbondataColumnHandle.class;
-  }
-
-  @Override public Class<? extends ConnectorSplit> getSplitClass() {
-    return CarbondataSplit.class;
-  }
-
-  @Override public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass() {
-    return CarbondataTransactionHandle.class;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
deleted file mode 100755
index f56f517..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
-import javax.inject.Inject;
-
-import static java.util.Objects.requireNonNull;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.presto.impl.CarbonTableReader;
-
-import static org.apache.carbondata.presto.Types.checkType;
-
-import com.facebook.presto.spi.ColumnHandle;
-import com.facebook.presto.spi.ColumnMetadata;
-import com.facebook.presto.spi.ConnectorSession;
-import com.facebook.presto.spi.ConnectorTableHandle;
-import com.facebook.presto.spi.ConnectorTableLayout;
-import com.facebook.presto.spi.ConnectorTableLayoutHandle;
-import com.facebook.presto.spi.ConnectorTableLayoutResult;
-import com.facebook.presto.spi.ConnectorTableMetadata;
-import com.facebook.presto.spi.Constraint;
-import com.facebook.presto.spi.SchemaNotFoundException;
-import com.facebook.presto.spi.SchemaTableName;
-import com.facebook.presto.spi.SchemaTablePrefix;
-import com.facebook.presto.spi.connector.ConnectorMetadata;
-import com.facebook.presto.spi.type.BigintType;
-import com.facebook.presto.spi.type.BooleanType;
-import com.facebook.presto.spi.type.DateType;
-import com.facebook.presto.spi.type.DecimalType;
-import com.facebook.presto.spi.type.DoubleType;
-import com.facebook.presto.spi.type.IntegerType;
-import com.facebook.presto.spi.type.SmallintType;
-import com.facebook.presto.spi.type.TimestampType;
-import com.facebook.presto.spi.type.Type;
-import com.facebook.presto.spi.type.VarcharType;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-public class CarbondataMetadata implements ConnectorMetadata {
-  private final String connectorId;
-  private CarbonTableReader carbonTableReader;
-
-  private Map<String, ColumnHandle> columnHandleMap;
-
-  @Inject public CarbondataMetadata(CarbondataConnectorId connectorId, CarbonTableReader reader) {
-    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
-    this.carbonTableReader = requireNonNull(reader, "client is null");
-  }
-
-
-  @Override public List<String> listSchemaNames(ConnectorSession session) {
-    return listSchemaNamesInternal();
-  }
-
-  public List<String> listSchemaNamesInternal() {
-    return carbonTableReader.getSchemaNames();
-  }
-
-  @Override
-  public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) {
-
-    List<String> schemaNames;
-    if (schemaNameOrNull != null) {
-      schemaNames = ImmutableList.of(schemaNameOrNull);
-    } else {
-      schemaNames = carbonTableReader.getSchemaNames();
-    }
-
-    ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
-    for (String schemaName : schemaNames) {
-      for (String tableName : carbonTableReader.getTableNames(schemaName)) {
-        if (!tableName.equalsIgnoreCase(".DS_Store")) {
-          builder.add(new SchemaTableName(schemaName, tableName));
-        }
-      }
-    }
-    return builder.build();
-  }
-
-  @Override
-  public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session,
-      SchemaTablePrefix prefix) {
-    requireNonNull(prefix, "SchemaTablePrefix is null");
-
-    ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
-    for (SchemaTableName tableName : listTables(session, prefix)) {
-      ConnectorTableMetadata tableMetadata = getTableMetadata(tableName);
-      if (tableMetadata != null) {
-        columns.put(tableName, tableMetadata.getColumns());
-      }
-    }
-    return columns.build();
-  }
-
-  //if prefix is null. return all tables
-  //if prefix is not null, just return this table
-  private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix) {
-    if (prefix.getSchemaName() == null) {
-      return listTables(session, prefix.getSchemaName());
-    }
-    return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
-  }
-
-  private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) {
-    if (!listSchemaNamesInternal().contains(schemaTableName.getSchemaName())) {
-      throw new SchemaNotFoundException(schemaTableName.getSchemaName());
-    }
-
-    CarbonTable carbonTable = carbonTableReader.getTable(schemaTableName);
-
-    List<ColumnMetadata> columnsMetaList = new LinkedList<>();
-    List<CarbonColumn> carbonColumns =
-        carbonTable.getCreateOrderColumn(schemaTableName.getTableName());
-    for (CarbonColumn col : carbonColumns) {
-      //show columns command will return these data
-      ColumnSchema columnSchema = col.getColumnSchema();
-      Type columnType = carbonDataType2SpiMapper(columnSchema);
-      String extraValues =
-          columnSchema.getEncodingList().stream().map(encoding -> encoding.toString() + " ")
-              .reduce("", String::concat);
-      ColumnMetadata columnMeta =
-          new ColumnMetadata(columnSchema.getColumnName(), columnType, "", extraValues, false);
-      columnsMetaList.add(columnMeta);
-    }
-
-    //carbondata connector's table metadata
-    return new ConnectorTableMetadata(schemaTableName, columnsMetaList);
-  }
-
-  @Override public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session,
-      ConnectorTableHandle tableHandle) {
-
-    CarbondataTableHandle handle =
-        checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
-    checkArgument(handle.getConnectorId().equals(connectorId),
-        "tableHandle is not for this connector");
-
-    String schemaName = handle.getSchemaTableName().getSchemaName();
-
-    if (!listSchemaNamesInternal().contains(schemaName)) {
-      throw new SchemaNotFoundException(schemaName);
-    }
-
-    //CarbonTable(official struct) is stored in CarbonMetadata(official struct)
-    CarbonTable cb = carbonTableReader.getTable(handle.getSchemaTableName());
-
-    ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
-    String tableName = handle.getSchemaTableName().getTableName();
-    for (CarbonDimension column : cb.getDimensionByTableName(tableName)) {
-      ColumnSchema cs = column.getColumnSchema();
-
-      Type spiType = carbonDataType2SpiMapper(cs);
-      columnHandles.put(cs.getColumnName(),
-          new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType,
-              column.getSchemaOrdinal(), column.getKeyOrdinal(), false, cs.getColumnUniqueId(),
-              cs.isUseInvertedIndex(), cs.getPrecision(), cs.getScale()));
-    }
-
-    for (CarbonMeasure measure : cb.getMeasureByTableName(tableName)) {
-      ColumnSchema cs = measure.getColumnSchema();
-      Type spiType = carbonDataType2SpiMapper(cs);
-      columnHandles.put(cs.getColumnName(),
-          new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType,
-              cs.getSchemaOrdinal(), measure.getOrdinal(), true, cs.getColumnUniqueId(),
-              cs.isUseInvertedIndex(), cs.getPrecision(), cs.getScale()));
-    }
-
-    columnHandleMap = columnHandles.build();
-
-    return columnHandleMap;
-  }
-
-  @Override public ColumnMetadata getColumnMetadata(ConnectorSession session,
-      ConnectorTableHandle tableHandle, ColumnHandle columnHandle) {
-
-    checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
-    return checkType(columnHandle, CarbondataColumnHandle.class, "columnHandle")
-        .getColumnMetadata();
-  }
-
-  @Override
-  public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
-    return new CarbondataTableHandle(connectorId, tableName);
-  }
-
-  @Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session,
-      ConnectorTableHandle table, Constraint<ColumnHandle> constraint,
-      Optional<Set<ColumnHandle>> desiredColumns) {
-    CarbondataTableHandle handle = checkType(table, CarbondataTableHandle.class, "table");
-    ConnectorTableLayout layout = new ConnectorTableLayout(
-        new CarbondataTableLayoutHandle(handle, constraint.getSummary()));
-    return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
-  }
-
-  @Override public ConnectorTableLayout getTableLayout(ConnectorSession session,
-      ConnectorTableLayoutHandle handle) {
-    return new ConnectorTableLayout(handle);
-  }
-
-  @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session,
-      ConnectorTableHandle table) {
-    return getTableMetadataInternal(table);
-  }
-
-  public ConnectorTableMetadata getTableMetadataInternal(ConnectorTableHandle table) {
-    CarbondataTableHandle carbondataTableHandle =
-        checkType(table, CarbondataTableHandle.class, "table");
-    checkArgument(carbondataTableHandle.getConnectorId().equals(connectorId),
-        "tableHandle is not for this connector");
-    return getTableMetadata(carbondataTableHandle.getSchemaTableName());
-  }
-
-  public static Type carbonDataType2SpiMapper(ColumnSchema columnSchema) {
-    DataType colType = columnSchema.getDataType();
-    if (colType == DataTypes.BOOLEAN) {
-      return BooleanType.BOOLEAN;
-    } else if (colType == DataTypes.SHORT) {
-      return SmallintType.SMALLINT;
-    } else if (colType == DataTypes.INT) {
-      return IntegerType.INTEGER;
-    } else if (colType == DataTypes.LONG) {
-      return BigintType.BIGINT;
-    } else if (colType == DataTypes.FLOAT || colType == DataTypes.DOUBLE) {
-      return DoubleType.DOUBLE;
-    } else if (DataTypes.isDecimal(colType)) {
-      if (columnSchema.getPrecision() > 0) {
-        return DecimalType.createDecimalType(columnSchema.getPrecision(), columnSchema.getScale());
-      } else {
-        return DecimalType.createDecimalType();
-      }
-    } else if (colType == DataTypes.STRING) {
-      return VarcharType.VARCHAR;
-    } else if (colType == DataTypes.DATE) {
-      return DateType.DATE;
-    } else if (colType == DataTypes.TIMESTAMP) {
-      return TimestampType.TIMESTAMP;
-    } else {
-      return VarcharType.VARCHAR;
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
index f9418a4..1f63b98 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
@@ -17,62 +17,153 @@
 
 package org.apache.carbondata.presto;
 
-import javax.inject.Inject;
+import java.util.function.Supplier;
 
 import static java.util.Objects.requireNonNull;
 
-import org.apache.carbondata.presto.impl.CarbonTableConfig;
 import org.apache.carbondata.presto.impl.CarbonTableReader;
 
+import com.facebook.presto.hive.CoercionPolicy;
+import com.facebook.presto.hive.DirectoryLister;
+import com.facebook.presto.hive.FileFormatDataSourceStats;
+import com.facebook.presto.hive.GenericHiveRecordCursorProvider;
+import com.facebook.presto.hive.HadoopDirectoryLister;
+import com.facebook.presto.hive.HdfsConfiguration;
+import com.facebook.presto.hive.HdfsConfigurationUpdater;
+import com.facebook.presto.hive.HdfsEnvironment;
+import com.facebook.presto.hive.HiveClientConfig;
+import com.facebook.presto.hive.HiveClientModule;
+import com.facebook.presto.hive.HiveCoercionPolicy;
+import com.facebook.presto.hive.HiveConnectorId;
+import com.facebook.presto.hive.HiveEventClient;
+import com.facebook.presto.hive.HiveFileWriterFactory;
+import com.facebook.presto.hive.HiveHdfsConfiguration;
+import com.facebook.presto.hive.HiveLocationService;
+import com.facebook.presto.hive.HiveMetadataFactory;
+import com.facebook.presto.hive.HiveNodePartitioningProvider;
+import com.facebook.presto.hive.HivePageSinkProvider;
+import com.facebook.presto.hive.HivePageSourceFactory;
+import com.facebook.presto.hive.HivePartitionManager;
+import com.facebook.presto.hive.HiveRecordCursorProvider;
+import com.facebook.presto.hive.HiveSessionProperties;
+import com.facebook.presto.hive.HiveSplitManager;
+import com.facebook.presto.hive.HiveTableProperties;
+import com.facebook.presto.hive.HiveTransactionManager;
+import com.facebook.presto.hive.HiveTypeTranslator;
+import com.facebook.presto.hive.HiveWriterStats;
+import com.facebook.presto.hive.LocationService;
+import com.facebook.presto.hive.NamenodeStats;
+import com.facebook.presto.hive.OrcFileWriterConfig;
+import com.facebook.presto.hive.OrcFileWriterFactory;
+import com.facebook.presto.hive.PartitionUpdate;
+import com.facebook.presto.hive.RcFileFileWriterFactory;
+import com.facebook.presto.hive.TableParameterCodec;
+import com.facebook.presto.hive.TransactionalMetadata;
+import com.facebook.presto.hive.TypeTranslator;
+import com.facebook.presto.hive.orc.DwrfPageSourceFactory;
+import com.facebook.presto.hive.orc.OrcPageSourceFactory;
+import com.facebook.presto.hive.parquet.ParquetPageSourceFactory;
+import com.facebook.presto.hive.parquet.ParquetRecordCursorProvider;
+import com.facebook.presto.hive.rcfile.RcFilePageSourceFactory;
+import com.facebook.presto.spi.connector.ConnectorNodePartitioningProvider;
+import com.facebook.presto.spi.connector.ConnectorPageSinkProvider;
 import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
 import com.facebook.presto.spi.connector.ConnectorSplitManager;
-import com.facebook.presto.spi.type.Type;
-import com.facebook.presto.spi.type.TypeManager;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
 import com.google.inject.Binder;
-import com.google.inject.Module;
 import com.google.inject.Scopes;
+import com.google.inject.TypeLiteral;
+import com.google.inject.multibindings.Multibinder;
+import io.airlift.event.client.EventClient;
 
-import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
-import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.inject.multibindings.Multibinder.newSetBinder;
 import static io.airlift.configuration.ConfigBinder.configBinder;
+import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
 
-public class CarbondataModule implements Module {
+import static org.weakref.jmx.ObjectNames.generatedNameOf;
+import static org.weakref.jmx.guice.ExportBinder.newExporter;
+
+/**
+ * Binds all necessary classes needed for this module.
+ */
+public class CarbondataModule extends HiveClientModule {
 
   private final String connectorId;
-  private final TypeManager typeManager;
 
-  public CarbondataModule(String connectorId, TypeManager typeManager) {
+  public CarbondataModule(String connectorId) {
+    super(connectorId);
     this.connectorId = requireNonNull(connectorId, "connector id is null");
-    this.typeManager = requireNonNull(typeManager, "typeManager is null");
   }
 
   @Override public void configure(Binder binder) {
-    binder.bind(TypeManager.class).toInstance(typeManager);
+    binder.bind(HiveConnectorId.class).toInstance(new HiveConnectorId(connectorId));
+    binder.bind(TypeTranslator.class).toInstance(new HiveTypeTranslator());
+    binder.bind(CoercionPolicy.class).to(HiveCoercionPolicy.class).in(Scopes.SINGLETON);
 
-    binder.bind(CarbondataConnectorId.class).toInstance(new CarbondataConnectorId(connectorId));
-    binder.bind(CarbondataMetadata.class).in(Scopes.SINGLETON);
-    binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
+    binder.bind(HdfsConfigurationUpdater.class).in(Scopes.SINGLETON);
+    binder.bind(HdfsConfiguration.class).to(HiveHdfsConfiguration.class).in(Scopes.SINGLETON);
+    binder.bind(HdfsEnvironment.class).in(Scopes.SINGLETON);
+    binder.bind(DirectoryLister.class).to(HadoopDirectoryLister.class).in(Scopes.SINGLETON);
+    configBinder(binder).bindConfig(HiveClientConfig.class);
+
+    binder.bind(HiveSessionProperties.class).in(Scopes.SINGLETON);
+    binder.bind(HiveTableProperties.class).in(Scopes.SINGLETON);
+
+    binder.bind(NamenodeStats.class).in(Scopes.SINGLETON);
+    newExporter(binder).export(NamenodeStats.class)
+        .as(generatedNameOf(NamenodeStats.class, connectorId));
+
+    Multibinder<HiveRecordCursorProvider> recordCursorProviderBinder =
+        newSetBinder(binder, HiveRecordCursorProvider.class);
+    recordCursorProviderBinder.addBinding().to(ParquetRecordCursorProvider.class)
+        .in(Scopes.SINGLETON);
+    recordCursorProviderBinder.addBinding().to(GenericHiveRecordCursorProvider.class)
+        .in(Scopes.SINGLETON);
+
+    binder.bind(HiveWriterStats.class).in(Scopes.SINGLETON);
+    newExporter(binder).export(HiveWriterStats.class)
+        .as(generatedNameOf(HiveWriterStats.class, connectorId));
+
+    newSetBinder(binder, EventClient.class).addBinding().to(HiveEventClient.class)
+        .in(Scopes.SINGLETON);
+    binder.bind(HivePartitionManager.class).in(Scopes.SINGLETON);
+    binder.bind(LocationService.class).to(HiveLocationService.class).in(Scopes.SINGLETON);
+    binder.bind(TableParameterCodec.class).in(Scopes.SINGLETON);
+    binder.bind(HiveMetadataFactory.class).in(Scopes.SINGLETON);
+    binder.bind(new TypeLiteral<Supplier<TransactionalMetadata>>() {
+    }).to(HiveMetadataFactory.class).in(Scopes.SINGLETON);
+    binder.bind(HiveTransactionManager.class).in(Scopes.SINGLETON);
     binder.bind(ConnectorSplitManager.class).to(CarbondataSplitManager.class).in(Scopes.SINGLETON);
+    newExporter(binder).export(ConnectorSplitManager.class)
+        .as(generatedNameOf(HiveSplitManager.class, connectorId));
     binder.bind(ConnectorPageSourceProvider.class).to(CarbondataPageSourceProvider.class)
         .in(Scopes.SINGLETON);
-    binder.bind(CarbondataHandleResolver.class).in(Scopes.SINGLETON);
-    configBinder(binder).bindConfig(CarbonTableConfig.class);
-  }
+    binder.bind(ConnectorPageSinkProvider.class).to(HivePageSinkProvider.class)
+        .in(Scopes.SINGLETON);
+    binder.bind(ConnectorNodePartitioningProvider.class).to(HiveNodePartitioningProvider.class)
+        .in(Scopes.SINGLETON);
+
+    jsonCodecBinder(binder).bindJsonCodec(PartitionUpdate.class);
 
-  public static final class TypeDeserializer extends FromStringDeserializer<Type> {
-    private final TypeManager typeManager;
+    binder.bind(FileFormatDataSourceStats.class).in(Scopes.SINGLETON);
+    newExporter(binder).export(FileFormatDataSourceStats.class)
+        .as(generatedNameOf(FileFormatDataSourceStats.class, connectorId));
 
-    @Inject public TypeDeserializer(TypeManager typeManager) {
-      super(Type.class);
-      this.typeManager = requireNonNull(typeManager, "typeManager is null");
-    }
+    Multibinder<HivePageSourceFactory> pageSourceFactoryBinder =
+        newSetBinder(binder, HivePageSourceFactory.class);
+    pageSourceFactoryBinder.addBinding().to(OrcPageSourceFactory.class).in(Scopes.SINGLETON);
+    pageSourceFactoryBinder.addBinding().to(DwrfPageSourceFactory.class).in(Scopes.SINGLETON);
+    pageSourceFactoryBinder.addBinding().to(ParquetPageSourceFactory.class).in(Scopes.SINGLETON);
+    pageSourceFactoryBinder.addBinding().to(RcFilePageSourceFactory.class).in(Scopes.SINGLETON);
 
-    @Override protected Type _deserialize(String value, DeserializationContext context) {
-      Type type = typeManager.getType(parseTypeSignature(value));
-      checkArgument(type != null, "Unknown type %s", value);
-      return type;
-    }
+    Multibinder<HiveFileWriterFactory> fileWriterFactoryBinder =
+        newSetBinder(binder, HiveFileWriterFactory.class);
+    binder.bind(OrcFileWriterFactory.class).in(Scopes.SINGLETON);
+    newExporter(binder).export(OrcFileWriterFactory.class)
+        .as(generatedNameOf(OrcFileWriterFactory.class, connectorId));
+    configBinder(binder).bindConfig(OrcFileWriterConfig.class);
+    fileWriterFactoryBinder.addBinding().to(OrcFileWriterFactory.class).in(Scopes.SINGLETON);
+    fileWriterFactoryBinder.addBinding().to(RcFileFileWriterFactory.class).in(Scopes.SINGLETON);
+    binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
index 96024e4..d7b7266 100644
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.presto;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 
 import static java.util.Objects.requireNonNull;
 
@@ -28,7 +29,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.executor.QueryExecutor;
 import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
-import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.model.QueryModel;
 import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
@@ -43,63 +43,79 @@ import org.apache.carbondata.presto.impl.CarbonTableReader;
 
 import static org.apache.carbondata.presto.Types.checkType;
 
+import com.facebook.presto.hive.HdfsEnvironment;
+import com.facebook.presto.hive.HiveClientConfig;
+import com.facebook.presto.hive.HiveColumnHandle;
+import com.facebook.presto.hive.HivePageSourceFactory;
+import com.facebook.presto.hive.HivePageSourceProvider;
+import com.facebook.presto.hive.HiveRecordCursorProvider;
+import com.facebook.presto.hive.HiveSplit;
 import com.facebook.presto.spi.ColumnHandle;
 import com.facebook.presto.spi.ConnectorPageSource;
 import com.facebook.presto.spi.ConnectorSession;
 import com.facebook.presto.spi.ConnectorSplit;
-import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.SchemaTableName;
 import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.type.TypeManager;
 import com.google.common.collect.ImmutableList;
 import com.google.inject.Inject;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-
 /**
  * Provider Class for Carbondata Page Source class.
  */
-public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider {
+public class CarbondataPageSourceProvider extends HivePageSourceProvider {
 
-  private String connectorId;
   private CarbonTableReader carbonTableReader;
   private String queryId ;
-
-  @Inject public CarbondataPageSourceProvider(CarbondataConnectorId connectorId,
+  private HdfsEnvironment hdfsEnvironment;
+
+  @Inject public CarbondataPageSourceProvider(
+      HiveClientConfig hiveClientConfig,
+      HdfsEnvironment hdfsEnvironment,
+      Set<HiveRecordCursorProvider> cursorProviders,
+      Set<HivePageSourceFactory> pageSourceFactories,
+      TypeManager typeManager,
       CarbonTableReader carbonTableReader) {
-    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+    super(hiveClientConfig, hdfsEnvironment, cursorProviders, pageSourceFactories, typeManager);
     this.carbonTableReader = requireNonNull(carbonTableReader, "carbonTableReader is null");
+    this.hdfsEnvironment = hdfsEnvironment;
   }
 
   @Override
   public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle,
       ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) {
-    this.queryId = ((CarbondataSplit)split).getQueryId();
+    HiveSplit carbonSplit =
+        checkType(split, HiveSplit.class, "split is not class HiveSplit");
+    this.queryId = carbonSplit.getSchema().getProperty("queryId");
+    if (this.queryId == null) {
+      // Fall back to hive pagesource.
+      return super.createPageSource(transactionHandle, session, split, columns);
+    }
+    Configuration configuration = this.hdfsEnvironment.getConfiguration(
+        new HdfsEnvironment.HdfsContext(session, carbonSplit.getDatabase(), carbonSplit.getTable()),
+        new Path(carbonSplit.getSchema().getProperty("tablePath")));
+    configuration = carbonTableReader.updateS3Properties(configuration);
     CarbonDictionaryDecodeReadSupport readSupport = new CarbonDictionaryDecodeReadSupport();
     PrestoCarbonVectorizedRecordReader carbonRecordReader =
-        createReader(split, columns, readSupport);
+        createReader(carbonSplit, columns, readSupport, configuration);
     return new CarbondataPageSource(carbonRecordReader, columns);
   }
 
   /**
-   * @param split
-   * @param columns
-   * @param readSupport
-   * @return
+   * Create vector reader using the split.
    */
-  private PrestoCarbonVectorizedRecordReader createReader(ConnectorSplit split,
-      List<? extends ColumnHandle> columns, CarbonDictionaryDecodeReadSupport readSupport) {
-
-    CarbondataSplit carbondataSplit =
-        checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
-    checkArgument(carbondataSplit.getConnectorId().equals(connectorId),
-        "split is not for this connector");
-    QueryModel queryModel = createQueryModel(carbondataSplit, columns);
+  private PrestoCarbonVectorizedRecordReader createReader(HiveSplit carbonSplit,
+      List<? extends ColumnHandle> columns, CarbonDictionaryDecodeReadSupport readSupport,
+      Configuration conf) {
+    QueryModel queryModel = createQueryModel(carbonSplit, columns, conf);
     if (carbonTableReader.config.getPushRowFilter() == null ||
         carbonTableReader.config.getPushRowFilter().equalsIgnoreCase("false")) {
       queryModel.setDirectVectorFill(true);
@@ -113,14 +129,10 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
       PrestoCarbonVectorizedRecordReader reader =
           new PrestoCarbonVectorizedRecordReader(queryExecutor, queryModel,
               (AbstractDetailQueryResultIterator) iterator, readSupport);
-      reader.setTaskId(carbondataSplit.getIndex());
+      reader.setTaskId(Long.parseLong(carbonSplit.getSchema().getProperty("index")));
       return reader;
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to get the Query Model ", e);
-    } catch (QueryExecutionException e) {
-      throw new RuntimeException(e.getMessage(), e);
-    } catch (Exception ex) {
-      throw new RuntimeException(ex.getMessage(), ex);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to create reader ", e);
     }
   }
 
@@ -129,30 +141,27 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
    * @param columns
    * @return
    */
-  private QueryModel createQueryModel(CarbondataSplit carbondataSplit,
-      List<? extends ColumnHandle> columns) {
+  private QueryModel createQueryModel(HiveSplit carbondataSplit,
+      List<? extends ColumnHandle> columns, Configuration conf) {
 
     try {
       CarbonProjection carbonProjection = getCarbonProjection(columns);
-      CarbonTable carbonTable = getCarbonTable(carbondataSplit);
-
-      Configuration conf = new Configuration();
+      CarbonTable carbonTable = getCarbonTable(carbondataSplit, conf);
       conf.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, "");
       String carbonTablePath = carbonTable.getAbsoluteTableIdentifier().getTablePath();
       CarbonTableInputFormat
           .setTransactionalTable(conf, carbonTable.getTableInfo().isTransactionalTable());
       CarbonTableInputFormat.setTableInfo(conf, carbonTable.getTableInfo());
-
       conf.set(CarbonTableInputFormat.INPUT_DIR, carbonTablePath);
       conf.set("query.id", queryId);
       JobConf jobConf = new JobConf(conf);
       CarbonTableInputFormat carbonTableInputFormat = createInputFormat(jobConf, carbonTable,
-          PrestoFilterUtil.parseFilterExpression(carbondataSplit.getConstraints()),
+          PrestoFilterUtil.parseFilterExpression(carbondataSplit.getEffectivePredicate()),
           carbonProjection);
       TaskAttemptContextImpl hadoopAttemptContext =
           new TaskAttemptContextImpl(jobConf, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
-      CarbonMultiBlockSplit carbonInputSplit =
-          CarbonLocalMultiBlockSplit.convertSplit(carbondataSplit.getLocalInputSplit());
+      CarbonMultiBlockSplit carbonInputSplit = CarbonLocalMultiBlockSplit
+          .convertSplit(carbondataSplit.getSchema().getProperty("carbonSplit"));
       QueryModel queryModel =
           carbonTableInputFormat.createQueryModel(carbonInputSplit, hadoopAttemptContext);
       queryModel.setQueryId(queryId);
@@ -204,10 +213,10 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
   private CarbonProjection getCarbonProjection(List<? extends ColumnHandle> columns) {
     CarbonProjection carbonProjection = new CarbonProjection();
     // Convert all columns handles
-    ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder();
+    ImmutableList.Builder<HiveColumnHandle> handles = ImmutableList.builder();
     for (ColumnHandle handle : columns) {
-      handles.add(checkType(handle, CarbondataColumnHandle.class, "handle"));
-      carbonProjection.addColumn(((CarbondataColumnHandle) handle).getColumnName());
+      handles.add(checkType(handle, HiveColumnHandle.class, "handle"));
+      carbonProjection.addColumn(((HiveColumnHandle) handle).getName());
     }
     return carbonProjection;
   }
@@ -216,9 +225,10 @@ public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider
    * @param carbonSplit
    * @return
    */
-  private CarbonTable getCarbonTable(CarbondataSplit carbonSplit) {
-    CarbonTableCacheModel tableCacheModel =
-        carbonTableReader.getCarbonCache(carbonSplit.getSchemaTableName());
+  private CarbonTable getCarbonTable(HiveSplit carbonSplit, Configuration configuration) {
+    CarbonTableCacheModel tableCacheModel = carbonTableReader
+        .getCarbonCache(new SchemaTableName(carbonSplit.getDatabase(), carbonSplit.getTable()),
+            carbonSplit.getSchema().getProperty("tablePath"), configuration);
     checkNotNull(tableCacheModel, "tableCacheModel should not be null");
     checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
     checkNotNull(tableCacheModel.carbonTable.getTableInfo(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
deleted file mode 100755
index 86e9161..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplit.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-import java.util.List;
-
-import static java.util.Objects.requireNonNull;
-
-import org.apache.carbondata.presto.impl.CarbonLocalMultiBlockSplit;
-
-import com.facebook.presto.spi.ColumnHandle;
-import com.facebook.presto.spi.ConnectorSplit;
-import com.facebook.presto.spi.HostAddress;
-import com.facebook.presto.spi.SchemaTableName;
-import com.facebook.presto.spi.predicate.TupleDomain;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableList;
-
-public class CarbondataSplit implements ConnectorSplit {
-
-  private final String connectorId;
-  private final SchemaTableName schemaTableName;
-  private final TupleDomain<ColumnHandle> constraints;
-  private final CarbonLocalMultiBlockSplit localInputSplit;
-  private final List<CarbondataColumnConstraint> rebuildConstraints;
-  private final ImmutableList<HostAddress> addresses;
-  private final String queryId;
-  private final long index;
-
-  @JsonCreator public CarbondataSplit(@JsonProperty("connectorId") String connectorId,
-      @JsonProperty("schemaTableName") SchemaTableName schemaTableName,
-      @JsonProperty("constraints") TupleDomain<ColumnHandle> constraints,
-      @JsonProperty("localInputSplit") CarbonLocalMultiBlockSplit localInputSplit,
-      @JsonProperty("rebuildConstraints") List<CarbondataColumnConstraint> rebuildConstraints,
-      @JsonProperty("queryId") String queryId,
-      @JsonProperty("index") long index) {
-    this.connectorId = requireNonNull(connectorId, "connectorId is null");
-    this.schemaTableName = requireNonNull(schemaTableName, "schemaTable is null");
-    this.constraints = requireNonNull(constraints, "constraints is null");
-    this.localInputSplit = requireNonNull(localInputSplit, "localInputSplit is null");
-    this.rebuildConstraints = requireNonNull(rebuildConstraints, "rebuildConstraints is null");
-    this.addresses = ImmutableList.of();
-    this.queryId = queryId;
-    this.index = index;
-  }
-
-  @JsonProperty public String getConnectorId() {
-    return connectorId;
-  }
-
-  @JsonProperty public SchemaTableName getSchemaTableName() {
-    return schemaTableName;
-  }
-
-  @JsonProperty public TupleDomain<ColumnHandle> getConstraints() {
-    return constraints;
-  }
-
-  @JsonProperty public CarbonLocalMultiBlockSplit getLocalInputSplit() {
-    return localInputSplit;
-  }
-
-  @JsonProperty public List<CarbondataColumnConstraint> getRebuildConstraints() {
-    return rebuildConstraints;
-  }
-
-  @Override public boolean isRemotelyAccessible() {
-    return true;
-  }
-
-  @Override public List<HostAddress> getAddresses() {
-    return addresses;
-  }
-
-  @Override public Object getInfo() {
-    return this;
-  }
-
-  @JsonProperty public String getQueryId() {
-    return queryId;
-  }
-
-  @JsonProperty public long getIndex() {
-    return index;
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
index eeb16f7..ded00fc 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -17,8 +17,9 @@
 
 package org.apache.carbondata.presto;
 
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Function;
 
 import javax.inject.Inject;
 
@@ -33,42 +34,74 @@ import org.apache.carbondata.presto.impl.CarbonLocalMultiBlockSplit;
 import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
 import org.apache.carbondata.presto.impl.CarbonTableReader;
 
-import static org.apache.carbondata.presto.Types.checkType;
-
-import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.hive.CoercionPolicy;
+import com.facebook.presto.hive.DirectoryLister;
+import com.facebook.presto.hive.ForHiveClient;
+import com.facebook.presto.hive.HdfsEnvironment;
+import com.facebook.presto.hive.HiveClientConfig;
+import com.facebook.presto.hive.HiveColumnHandle;
+import com.facebook.presto.hive.HiveSplit;
+import com.facebook.presto.hive.HiveSplitManager;
+import com.facebook.presto.hive.HiveTableLayoutHandle;
+import com.facebook.presto.hive.HiveTransactionHandle;
+import com.facebook.presto.hive.NamenodeStats;
+import com.facebook.presto.hive.metastore.SemiTransactionalHiveMetastore;
+import com.facebook.presto.hive.metastore.Table;
 import com.facebook.presto.spi.ConnectorSession;
 import com.facebook.presto.spi.ConnectorSplit;
 import com.facebook.presto.spi.ConnectorSplitSource;
 import com.facebook.presto.spi.ConnectorTableLayoutHandle;
 import com.facebook.presto.spi.FixedSplitSource;
+import com.facebook.presto.spi.HostAddress;
 import com.facebook.presto.spi.SchemaTableName;
-import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.TableNotFoundException;
 import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
 import com.facebook.presto.spi.predicate.TupleDomain;
 import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
+import static com.google.common.collect.ImmutableList.toImmutableList;
 
 /**
  * Build Carbontable splits
  * filtering irrelevant blocks
  */
-public class CarbondataSplitManager implements ConnectorSplitManager {
+public class CarbondataSplitManager extends HiveSplitManager {
 
-  private final String connectorId;
   private final CarbonTableReader carbonTableReader;
-
-  @Inject
-  public CarbondataSplitManager(CarbondataConnectorId connectorId, CarbonTableReader reader) {
-    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+  private final Function<HiveTransactionHandle, SemiTransactionalHiveMetastore> metastoreProvider;
+  private final HdfsEnvironment hdfsEnvironment;
+
+  @Inject public CarbondataSplitManager(HiveClientConfig hiveClientConfig,
+      Function<HiveTransactionHandle, SemiTransactionalHiveMetastore> metastoreProvider,
+      NamenodeStats namenodeStats, HdfsEnvironment hdfsEnvironment, DirectoryLister directoryLister,
+      @ForHiveClient ExecutorService executorService, CoercionPolicy coercionPolicy,
+      CarbonTableReader reader) {
+    super(hiveClientConfig, metastoreProvider, namenodeStats, hdfsEnvironment, directoryLister,
+        executorService, coercionPolicy);
     this.carbonTableReader = requireNonNull(reader, "client is null");
+    this.metastoreProvider = requireNonNull(metastoreProvider, "metastore is null");
+    this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
   }
 
   public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle,
-      ConnectorSession session, ConnectorTableLayoutHandle layout,
+      ConnectorSession session, ConnectorTableLayoutHandle layoutHandle,
       SplitSchedulingStrategy splitSchedulingStrategy) {
-    CarbondataTableLayoutHandle layoutHandle = (CarbondataTableLayoutHandle) layout;
-    CarbondataTableHandle tableHandle = layoutHandle.getTable();
-    SchemaTableName key = tableHandle.getSchemaTableName();
+
+    HiveTableLayoutHandle layout = (HiveTableLayoutHandle) layoutHandle;
+    SchemaTableName schemaTableName = layout.getSchemaTableName();
+
+    // get table metadata
+    SemiTransactionalHiveMetastore metastore =
+        metastoreProvider.apply((HiveTransactionHandle) transactionHandle);
+    Table table =
+        metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName())
+            .orElseThrow(() -> new TableNotFoundException(schemaTableName));
+    if (!table.getStorage().getStorageFormat().getInputFormat().contains("carbon")) {
+      return super.getSplits(transactionHandle, session, layoutHandle, splitSchedulingStrategy);
+    }
+    String location = table.getStorage().getLocation();
 
     String queryId = System.nanoTime() + "";
     QueryStatistic statistic = new QueryStatistic();
@@ -78,24 +111,39 @@ public class CarbondataSplitManager implements ConnectorSplitManager {
     statistic = new QueryStatistic();
 
     carbonTableReader.setQueryId(queryId);
-    // Packaging presto-TupleDomain into CarbondataColumnConstraint,
-    // to decouple from presto-spi Module
-    List<CarbondataColumnConstraint> rebuildConstraints =
-        getColumnConstraints(layoutHandle.getConstraint());
-
-    CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key);
+    TupleDomain<HiveColumnHandle> predicate =
+        (TupleDomain<HiveColumnHandle>) layout.getCompactEffectivePredicate();
+    Configuration configuration = this.hdfsEnvironment.getConfiguration(
+        new HdfsEnvironment.HdfsContext(session, schemaTableName.getSchemaName(),
+            schemaTableName.getTableName()), new Path(location));
+    configuration = carbonTableReader.updateS3Properties(configuration);
+    CarbonTableCacheModel cache =
+        carbonTableReader.getCarbonCache(schemaTableName, location, configuration);
     if (null != cache) {
-      Expression filters = PrestoFilterUtil.parseFilterExpression(layoutHandle.getConstraint());
+      Expression filters = PrestoFilterUtil.parseFilterExpression(predicate);
       try {
+
         List<CarbonLocalMultiBlockSplit> splits =
-            carbonTableReader.getInputSplits2(cache, filters, layoutHandle.getConstraint());
+            carbonTableReader.getInputSplits2(cache, filters, predicate, configuration);
 
         ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
         long index = 0;
         for (CarbonLocalMultiBlockSplit split : splits) {
           index++;
-          cSplits.add(new CarbondataSplit(connectorId, tableHandle.getSchemaTableName(),
-              layoutHandle.getConstraint(), split, rebuildConstraints, queryId, index));
+          Properties properties = new Properties();
+          for (Map.Entry<String, String> entry : table.getStorage().getSerdeParameters()
+              .entrySet()) {
+            properties.setProperty(entry.getKey(), entry.getValue());
+          }
+          properties.setProperty("tablePath", cache.carbonTable.getTablePath());
+          properties.setProperty("carbonSplit", split.getJsonString());
+          properties.setProperty("queryId", queryId);
+          properties.setProperty("index", String.valueOf(index));
+          cSplits.add(
+              new HiveSplit(schemaTableName.getSchemaName(), schemaTableName.getTableName(),
+                  schemaTableName.getTableName(), "", 0, 0, 0, properties, new ArrayList(),
+                  getHostAddresses(split.getLocations()), OptionalInt.empty(), false, predicate,
+                  new HashMap<>(), Optional.empty()));
         }
 
         statisticRecorder.logStatisticsAsTableDriver();
@@ -112,24 +160,8 @@ public class CarbondataSplitManager implements ConnectorSplitManager {
     return null;
   }
 
-  /**
-   *
-   * @param constraint
-   * @return
-   */
-  public List<CarbondataColumnConstraint> getColumnConstraints(
-      TupleDomain<ColumnHandle> constraint) {
-    ImmutableList.Builder<CarbondataColumnConstraint> constraintBuilder = ImmutableList.builder();
-    for (TupleDomain.ColumnDomain<ColumnHandle> columnDomain : constraint.getColumnDomains()
-        .get()) {
-      CarbondataColumnHandle columnHandle =
-          checkType(columnDomain.getColumn(), CarbondataColumnHandle.class, "column handle");
-
-      constraintBuilder.add(new CarbondataColumnConstraint(columnHandle.getColumnName(),
-          Optional.of(columnDomain.getDomain()), columnHandle.isInvertedIndex()));
-    }
-
-    return constraintBuilder.build();
+  private static List<HostAddress> getHostAddresses(String[] hosts) {
+    return Arrays.stream(hosts).map(HostAddress::fromString).collect(toImmutableList());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java
deleted file mode 100755
index 6dbd92f..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableHandle.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.util.Objects;
-
-import static java.util.Locale.ENGLISH;
-import static java.util.Objects.requireNonNull;
-
-import com.facebook.presto.spi.ConnectorTableHandle;
-import com.facebook.presto.spi.SchemaTableName;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Joiner;
-
-public class CarbondataTableHandle implements ConnectorTableHandle {
-
-  private final String connectorId;
-  private final SchemaTableName schemaTableName;
-
-  @JsonCreator public CarbondataTableHandle(@JsonProperty("connectorId") String connectorId,
-      @JsonProperty("schemaTableName") SchemaTableName schemaTableName) {
-    this.connectorId = requireNonNull(connectorId.toLowerCase(ENGLISH), "connectorId is null");
-    this.schemaTableName = schemaTableName;
-  }
-
-  @JsonProperty public String getConnectorId() {
-    return connectorId;
-  }
-
-  @JsonProperty public SchemaTableName getSchemaTableName() {
-    return schemaTableName;
-  }
-
-  @Override public int hashCode() {
-    return Objects.hash(connectorId, schemaTableName);
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if ((obj == null) || (getClass() != obj.getClass())) {
-      return false;
-    }
-
-    CarbondataTableHandle other = (CarbondataTableHandle) obj;
-    return Objects.equals(this.connectorId, other.connectorId) && this.schemaTableName
-        .equals(other.getSchemaTableName());
-  }
-
-  @Override public String toString() {
-    return Joiner.on(":").join(connectorId, schemaTableName.toString());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java
deleted file mode 100755
index 5c50ad5..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTableLayoutHandle.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.util.Objects;
-
-import static java.util.Objects.requireNonNull;
-
-import com.facebook.presto.spi.ColumnHandle;
-import com.facebook.presto.spi.ConnectorTableLayoutHandle;
-import com.facebook.presto.spi.predicate.TupleDomain;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-public class CarbondataTableLayoutHandle implements ConnectorTableLayoutHandle {
-  private final CarbondataTableHandle table;
-  private final TupleDomain<ColumnHandle> constraint;
-
-  @JsonCreator
-  public CarbondataTableLayoutHandle(@JsonProperty("table") CarbondataTableHandle table,
-      @JsonProperty("constraint") TupleDomain<ColumnHandle> constraint) {
-    this.table = requireNonNull(table, "table is null");
-    this.constraint = requireNonNull(constraint, "constraint is null");
-  }
-
-  @JsonProperty public CarbondataTableHandle getTable() {
-    return table;
-  }
-
-  @JsonProperty public TupleDomain<ColumnHandle> getConstraint() {
-    return constraint;
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-
-    if (obj == null || getClass() != obj.getClass()) {
-      return false;
-    }
-
-    CarbondataTableLayoutHandle other = (CarbondataTableLayoutHandle) obj;
-    return Objects.equals(table, other.table) && Objects.equals(constraint, other.constraint);
-  }
-
-  @Override public int hashCode() {
-    return Objects.hash(table, constraint);
-  }
-
-  @Override public String toString() {
-    return toStringHelper(this).add("table", table).add("constraint", constraint).toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e193df0a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java
deleted file mode 100755
index 40f8692..0000000
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataTransactionHandle.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.presto;
-
-import java.util.Objects;
-import java.util.UUID;
-
-import static java.util.Objects.requireNonNull;
-
-import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-public class CarbondataTransactionHandle implements ConnectorTransactionHandle {
-  private final UUID uuid;
-
-  public CarbondataTransactionHandle() {
-    this(UUID.randomUUID());
-  }
-
-  @JsonCreator public CarbondataTransactionHandle(@JsonProperty("uuid") UUID uuid) {
-    this.uuid = requireNonNull(uuid, "uuid is null");
-  }
-
-  @JsonProperty public UUID getUuid() {
-    return uuid;
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if ((obj == null) || (getClass() != obj.getClass())) {
-      return false;
-    }
-
-    return Objects.equals(uuid, ((CarbondataTransactionHandle) obj).uuid);
-  }
-
-  @Override public int hashCode() {
-    return Objects.hash(uuid);
-  }
-
-  @Override public String toString() {
-    return toStringHelper(this).add("uuid", uuid).toString();
-  }
-
-}