You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/05/25 03:31:42 UTC

[1/5] carbondata git commit: fix typo issues of integration/ presto

Repository: carbondata
Updated Branches:
  refs/heads/master 419ca2496 -> 9669c0b29


fix typo issues of integration/ presto


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0e93a3fd
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0e93a3fd
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0e93a3fd

Branch: refs/heads/master
Commit: 0e93a3fdfd40740b0e360141c88fe88d4e813617
Parents: 419ca24
Author: chenliang613 <ch...@huawei.com>
Authored: Sun Apr 2 07:57:12 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Thu May 25 11:28:07 2017 +0800

----------------------------------------------------------------------
 .../presto/CarbondataColumnHandle.java          | 141 ++++
 .../carbondata/presto/CarbondataMetadata.java   | 271 +++++++
 .../presto/impl/CarbonLocalInputSplit.java      |  83 +++
 .../presto/impl/CarbonTableConfig.java          |  57 ++
 .../presto/impl/CarbonTableReader.java          | 723 +++++++++++++++++++
 5 files changed, 1275 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e93a3fd/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
new file mode 100755
index 0000000..cc10165
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.type.Type;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+//import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Objects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataColumnHandle implements ColumnHandle {
+  private final String connectorId;
+  private final String columnName;
+
+  public boolean isInvertedIndex() {
+    return isInvertedIndex;
+  }
+
+  private final Type columnType;
+  private final int ordinalPosition;
+  private final int keyOrdinal;
+  private final int columnGroupOrdinal;
+
+  private final int columnGroupId;
+  private final String columnUniqueId;
+  private final boolean isInvertedIndex;
+
+  public boolean isMeasure() {
+    return isMeasure;
+  }
+
+  private final boolean isMeasure;
+
+  public int getKeyOrdinal() {
+    return keyOrdinal;
+  }
+
+  public int getColumnGroupOrdinal() {
+    return columnGroupOrdinal;
+  }
+
+  public int getColumnGroupId() {
+    return columnGroupId;
+  }
+
+  public String getColumnUniqueId() {
+    return columnUniqueId;
+  }
+
+  /**
+   * ordinalPosition of a columnhandle is the -> number of the column in the entire list of columns of this table
+   * IT DOESNT DEPEND ON THE QUERY (select clm3, clm0, clm1  from tablename)
+   * The columnhandle of clm3 : has ordinalposition = 3
+   */
+
+  @JsonCreator public CarbondataColumnHandle(@JsonProperty("connectorId") String connectorId,
+      @JsonProperty("columnName") String columnName, @JsonProperty("columnType") Type columnType,
+      @JsonProperty("ordinalPosition") int ordinalPosition,
+      @JsonProperty("keyOrdinal") int keyOrdinal,
+      @JsonProperty("columnGroupOrdinal") int columnGroupOrdinal,
+      @JsonProperty("isMeasure") boolean isMeasure,
+      @JsonProperty("columnGroupId") int columnGroupId,
+      @JsonProperty("columnUniqueId") String columnUniqueId,
+      @JsonProperty("isInvertedIndex") boolean isInvertedIndex) {
+    this.connectorId = requireNonNull(connectorId, "connectorId is null");
+    this.columnName = requireNonNull(columnName, "columnName is null");
+    this.columnType = requireNonNull(columnType, "columnType is null");
+
+    this.ordinalPosition = requireNonNull(ordinalPosition, "ordinalPosition is null");
+    this.keyOrdinal = requireNonNull(keyOrdinal, "keyOrdinal is null");
+    this.columnGroupOrdinal = requireNonNull(columnGroupOrdinal, "columnGroupOrdinal is null");
+
+    this.isMeasure = isMeasure;
+    this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null");
+    this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
+    this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
+  }
+
+  @JsonProperty public String getConnectorId() {
+    return connectorId;
+  }
+
+  @JsonProperty public String getColumnName() {
+    return columnName;
+  }
+
+  @JsonProperty public Type getColumnType() {
+    return columnType;
+  }
+
+  @JsonProperty public int getOrdinalPosition() {
+    return ordinalPosition;
+  }
+
+  public ColumnMetadata getColumnMetadata() {
+    return new ColumnMetadata(columnName, columnType, null, false);
+  }
+
+  @Override public int hashCode() {
+    return Objects.hash(connectorId, columnName);
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if ((obj == null) || (getClass() != obj.getClass())) {
+      return false;
+    }
+
+    CarbondataColumnHandle other = (CarbondataColumnHandle) obj;
+    return Objects.equals(this.connectorId, other.connectorId) && Objects
+        .equals(this.columnName, other.columnName);
+  }
+
+  @Override public String toString() {
+    return toStringHelper(this).add("connectorId", connectorId).add("columnName", columnName)
+        .add("columnType", columnType).add("ordinalPosition", ordinalPosition).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e93a3fd/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
new file mode 100755
index 0000000..d2c5ab6
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.presto.impl.CarbonTableReader;
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.type.*;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+
+import javax.inject.Inject;
+import java.util.*;
+
+import static org.apache.carbondata.presto.Types.checkType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataMetadata implements ConnectorMetadata {
+  private final String connectorId;
+  private CarbonTableReader carbonTableReader;
+  private ClassLoader classLoader;
+
+  private Map<String, ColumnHandle> columnHandleMap;
+
+  @Inject public CarbondataMetadata(CarbondataConnectorId connectorId, CarbonTableReader reader) {
+    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+    this.carbonTableReader = requireNonNull(reader, "client is null");
+  }
+
+  public void putClassLoader(ClassLoader classLoader) {
+    this.classLoader = classLoader;
+  }
+
+  @Override public List<String> listSchemaNames(ConnectorSession session) {
+    return listSchemaNamesInternal();
+  }
+
+  public List<String> listSchemaNamesInternal() {
+    List<String> ret;
+    try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+      ret = carbonTableReader.getSchemaNames();
+    }
+    return ret;
+  }
+
+  @Override
+  public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) {
+
+        /*List<SchemaTableName> all = carbonTableReader.getTableList();
+        if(schemaNameOrNull != null)
+        {
+            return all.stream().filter(a -> schemaNameOrNull.equals(a.getSchemaName())).collect(Collectors.toList());
+        }
+        return all;*/
+
+    List<String> schemaNames;
+    if (schemaNameOrNull != null) {
+      schemaNames = ImmutableList.of(schemaNameOrNull);
+    } else {
+      schemaNames = carbonTableReader.getSchemaNames();
+    }
+
+    ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
+    for (String schemaName : schemaNames) {
+      for (String tableName : carbonTableReader.getTableNames(schemaName)) {
+        builder.add(new SchemaTableName(schemaName, tableName));
+      }
+    }
+    return builder.build();
+  }
+
+  @Override
+  public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session,
+      SchemaTablePrefix prefix) {
+    requireNonNull(prefix, "SchemaTablePrefix is null");
+
+    ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
+    for (SchemaTableName tableName : listTables(session, prefix)) {
+      ConnectorTableMetadata tableMetadata = getTableMetadata(tableName);
+      if (tableMetadata != null) {
+        columns.put(tableName, tableMetadata.getColumns());
+      }
+    }
+    return columns.build();
+  }
+
+  //if prefix is null. return all tables
+  //if prefix is not null, just return this table
+  private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix) {
+    if (prefix.getSchemaName() == null) {
+      return listTables(session, prefix.getSchemaName());
+    }
+    return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
+  }
+
+  private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) {
+    if (!listSchemaNamesInternal().contains(tableName.getSchemaName())) {
+      return null;
+    }
+
+    CarbonTable cb = carbonTableReader.getTable(tableName);
+    if (cb == null) {
+      return null;
+    }
+
+    List<ColumnMetadata> spiCols = new LinkedList<>();
+    List<CarbonColumn> carbonColumns = cb.getCreateOrderColumn(tableName.getTableName());
+    for (CarbonColumn col : carbonColumns) {
+      //show columns command will return these data
+      Type spiType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
+      ColumnMetadata spiCol = new ColumnMetadata(col.getColumnSchema().getColumnName(), spiType);
+      spiCols.add(spiCol);
+    }
+
+    //carbondata connector's table metadata
+    return new ConnectorTableMetadata(tableName, spiCols);
+  }
+
+  @Override public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session,
+      ConnectorTableHandle tableHandle) {
+
+    CarbondataTableHandle handle =
+        checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
+    checkArgument(handle.getConnectorId().equals(connectorId),
+        "tableHandle is not for this connector");
+
+    String schemaName = handle.getSchemaTableName().getSchemaName();
+    if (!listSchemaNamesInternal().contains(schemaName)) {
+      throw new SchemaNotFoundException(schemaName);
+    }
+
+    //CarbonTable(official struct) is stored in CarbonMetadata(official struct)
+    CarbonTable cb = carbonTableReader.getTable(handle.getSchemaTableName());
+    if (cb == null) {
+      throw new TableNotFoundException(handle.getSchemaTableName());
+    }
+
+    ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
+    String tableName = handle.getSchemaTableName().getTableName();
+    for (CarbonDimension column : cb.getDimensionByTableName(tableName)) {
+      ColumnSchema cs = column.getColumnSchema();
+
+      int complex = column.getComplexTypeOrdinal();
+      column.getNumberOfChild();
+      column.getListOfChildDimensions();
+
+      Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+      columnHandles.put(cs.getColumnName(),
+          new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, column.getSchemaOrdinal(),
+              column.getKeyOrdinal(), column.getColumnGroupOrdinal(), false, cs.getColumnGroupId(),
+              cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+    }
+
+    for (CarbonMeasure measure : cb.getMeasureByTableName(tableName)) {
+      ColumnSchema cs = measure.getColumnSchema();
+
+      Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+      columnHandles.put(cs.getColumnName(),
+          new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, cs.getSchemaOrdinal(),
+              measure.getOrdinal(), cs.getColumnGroupId(), true, cs.getColumnGroupId(),
+              cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+    }
+
+    //should i cache it?
+    columnHandleMap = columnHandles.build();
+
+    return columnHandleMap;
+  }
+
+  @Override public ColumnMetadata getColumnMetadata(ConnectorSession session,
+      ConnectorTableHandle tableHandle, ColumnHandle columnHandle) {
+
+    checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
+    return checkType(columnHandle, CarbondataColumnHandle.class, "columnHandle")
+        .getColumnMetadata();
+  }
+
+  @Override
+  public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
+    //check tablename is valid
+    //schema is exist
+    //tables is exist
+
+    //CarbondataTable  get from jar
+    return new CarbondataTableHandle(connectorId, tableName);
+  }
+
+  @Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session,
+      ConnectorTableHandle table, Constraint<ColumnHandle> constraint,
+      Optional<Set<ColumnHandle>> desiredColumns) {
+    CarbondataTableHandle handle = checkType(table, CarbondataTableHandle.class, "table");
+    ConnectorTableLayout layout = new ConnectorTableLayout(
+        new CarbondataTableLayoutHandle(handle, constraint.getSummary()/*, constraint.getPredicateMap(),constraint.getFilterTuples()*/));
+    return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
+  }
+
+  @Override public ConnectorTableLayout getTableLayout(ConnectorSession session,
+      ConnectorTableLayoutHandle handle) {
+    return new ConnectorTableLayout(handle);
+  }
+
+  @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session,
+      ConnectorTableHandle table) {
+    return getTableMetadataInternal(table);
+  }
+
+  public ConnectorTableMetadata getTableMetadataInternal(ConnectorTableHandle table) {
+    CarbondataTableHandle carbondataTableHandle =
+        checkType(table, CarbondataTableHandle.class, "table");
+    checkArgument(carbondataTableHandle.getConnectorId().equals(connectorId),
+        "tableHandle is not for this connector");
+    return getTableMetadata(carbondataTableHandle.getSchemaTableName());
+  }
+
+  public static Type CarbondataType2SpiMapper(DataType colType) {
+    switch (colType) {
+      case BOOLEAN:
+        return BooleanType.BOOLEAN;
+      case SHORT:
+        return SmallintType.SMALLINT;
+      case INT:
+        return IntegerType.INTEGER;
+      case LONG:
+        return BigintType.BIGINT;
+      case FLOAT:
+      case DOUBLE:
+        return DoubleType.DOUBLE;
+
+      case DECIMAL:
+        return DecimalType.createDecimalType();
+      case STRING:
+        return VarcharType.VARCHAR;
+      case DATE:
+        return DateType.DATE;
+      case TIMESTAMP:
+        return TimestampType.TIMESTAMP;
+
+            /*case DataType.MAP:
+            case DataType.ARRAY:
+            case DataType.STRUCT:
+            case DataType.NULL:*/
+
+      default:
+        return VarcharType.VARCHAR;
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e93a3fd/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
new file mode 100755
index 0000000..ba8d9b5
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class CarbonLocalInputSplit {
+
+  private static final long serialVersionUID = 3520344046772190207L;
+  private String segmentId;
+  private String path;
+  private long start;
+  private long length;
+  private List<String> locations;
+  private short version;
+
+  /**
+   * Number of BlockLets in a block
+   */
+  private int numberOfBlocklets = 0;
+
+  @JsonProperty public short getVersion() {
+    return version;
+  }
+
+  @JsonProperty public List<String> getLocations() {
+    return locations;
+  }
+
+  @JsonProperty public long getLength() {
+    return length;
+  }
+
+  @JsonProperty public long getStart() {
+    return start;
+  }
+
+  @JsonProperty public String getPath() {
+    return path;
+  }
+
+  @JsonProperty public String getSegmentId() {
+    return segmentId;
+  }
+
+  @JsonProperty public int getNumberOfBlocklets() {
+    return numberOfBlocklets;
+  }
+
+  @JsonCreator public CarbonLocalInputSplit(@JsonProperty("segmentId") String segmentId,
+      @JsonProperty("path") String path, @JsonProperty("start") long start,
+      @JsonProperty("length") long length, @JsonProperty("locations") List<String> locations,
+      @JsonProperty("numberOfBlocklets") int numberOfBlocklets/*,
+                                 @JsonProperty("tableBlockInfo") TableBlockInfo tableBlockInfo*/,
+      @JsonProperty("version") short version) {
+    this.path = path;
+    this.start = start;
+    this.length = length;
+    this.segmentId = segmentId;
+    this.locations = locations;
+    this.numberOfBlocklets = numberOfBlocklets;
+    //this.tableBlockInfo = tableBlockInfo;
+    this.version = version;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e93a3fd/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
new file mode 100755
index 0000000..a0ef63f
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import io.airlift.configuration.Config;
+
+import javax.validation.constraints.NotNull;
+
+public class CarbonTableConfig {
+
+  //read from config
+  private String dbPath;
+  private String tablePath;
+  private String storePath;
+
+  @NotNull public String getDbPath() {
+    return dbPath;
+  }
+
+  @Config("carbondata-store") public CarbonTableConfig setDbPath(String dbPath) {
+    this.dbPath = dbPath;
+    return this;
+  }
+
+  @NotNull public String getTablePath() {
+    return tablePath;
+  }
+
+  @Config("carbondata-store") public CarbonTableConfig setTablePath(String tablePath) {
+    this.tablePath = tablePath;
+    return this;
+  }
+
+  @NotNull public String getStorePath() {
+    return storePath;
+  }
+
+  @Config("carbondata-store") public CarbonTableConfig setStorePath(String storePath) {
+    this.storePath = storePath;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e93a3fd/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
new file mode 100755
index 0000000..14ecfbc
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.*;
+import org.apache.carbondata.core.datastore.block.*;
+import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
+import org.apache.carbondata.core.datastore.impl.btree.BlockBTreeLeafNode;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.service.impl.PathFactory;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CacheClient;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.thrift.TBase;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbonTableReader {
+
+  /** CarbonTableReader will be a facade of these utils
+   *
+   * 1:CarbonMetadata,(logic table)
+   * 2:FileFactory, (physic table file)
+   * 3:CarbonCommonFactory, (offer some )
+   * 4:DictionaryFactory, (parse dictionary util)
+   */
+
+  private CarbonTableConfig config;
+  private List<SchemaTableName> tableList;
+  private CarbonFile dbStore;
+  private FileFactory.FileType fileType;
+
+  //as a cache for Carbon reader
+  private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = requireNonNull(config, "CarbonTableConfig is null");
+    this.cc = new ConcurrentHashMap<>();
+  }
+
+  //for worker node to initialize carbon metastore
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table) {
+    if (!cc.containsKey(table)) {
+      try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
+          FileFactory.class.getClassLoader())) {
+        if (dbStore == null) {
+          fileType = FileFactory.getFileType(config.getStorePath());
+          try {
+            dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+          } catch (Exception ex) {
+            throw new RuntimeException(ex);
+          }
+        }
+      }
+      updateSchemaTables();
+      parseCarbonMetadata(table);
+    }
+
+    if (cc.containsKey(table)) return cc.get(table);
+    else return null;
+  }
+
+  public List<String> getSchemaNames() {
+    return updateSchemaList();
+  }
+
+  //default PathFilter
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+
+  public boolean updateDbStore() {
+    if (dbStore == null) {
+      fileType = FileFactory.getFileType(config.getStorePath());
+      try {
+        dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+    return true;
+  }
+
+  public List<String> updateSchemaList() {
+    updateDbStore();
+
+    if (dbStore != null) {
+      List<String> scs =
+          Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
+      return scs;
+    } else return ImmutableList.of();
+  }
+
+  public Set<String> getTableNames(String schema) {
+    requireNonNull(schema, "schema is null");
+    return updateTableList(schema);
+  }
+
+  public Set<String> updateTableList(String dbName) {
+    List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName()))
+        .collect(Collectors.toList());
+    if (schema.size() > 0) {
+      return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName())
+          .collect(Collectors.toSet());
+    } else return ImmutableSet.of();
+  }
+
+  public CarbonTable getTable(SchemaTableName schemaTableName) {
+    try {
+      updateSchemaTables();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    requireNonNull(schemaTableName, "schemaTableName is null");
+    CarbonTable table = loadTableMetadata(schemaTableName);
+
+    return table;
+  }
+
+  public void updateSchemaTables() {
+    //update logic determine later
+    if (dbStore == null) {
+      updateSchemaList();
+    }
+
+    tableList = new LinkedList<>();
+    for (CarbonFile db : dbStore.listFiles()) {
+      if (!db.getName().endsWith(".mdt")) {
+        for (CarbonFile table : db.listFiles()) {
+          tableList.add(new SchemaTableName(db.getName(), table.getName()));
+        }
+      }
+    }
+  }
+
+  private CarbonTable loadTableMetadata(SchemaTableName schemaTableName) {
+    for (SchemaTableName table : tableList) {
+      if (!table.equals(schemaTableName)) continue;
+
+      return parseCarbonMetadata(table);
+    }
+    return null;
+  }
+
+  /**
+   * parse carbon metadata into cc(CarbonTableReader cache)
+   */
+  public CarbonTable parseCarbonMetadata(SchemaTableName table) {
+    CarbonTable result = null;
+    try {
+      CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel());
+      if (cache.isValid()) return cache.carbonTable;
+
+      //Step1: get table meta path, load carbon table param
+      String storePath = config.getStorePath();
+      cache.carbonTableIdentifier =
+          new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(),
+              UUID.randomUUID().toString());
+      cache.carbonTablePath =
+          PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier);
+      cc.put(table, cache);
+
+      //Step2: check file existed? read schema file
+      ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+        public TBase create() {
+          return new org.apache.carbondata.format.TableInfo();
+        }
+      };
+      ThriftReader thriftReader =
+          new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), createTBase);
+      thriftReader.open();
+      org.apache.carbondata.format.TableInfo tableInfo =
+          (org.apache.carbondata.format.TableInfo) thriftReader.read();
+      thriftReader.close();
+
+      //Format Level TableInfo, need transfer to Code Level TableInfo
+      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+      TableInfo wrapperTableInfo = schemaConverter
+          .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+              storePath);
+      wrapperTableInfo.setMetaDataFilepath(
+          CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
+      //load metadata info into CarbonMetadata
+      CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+
+      cache.tableInfo = wrapperTableInfo;
+      cache.carbonTable = CarbonMetadata.getInstance()
+          .getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName());
+      result = cache.carbonTable;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+
+    return result;
+  }
+
+  public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters) throws Exception {
+
+    //处理filter, 下推filter,将应用在Segment的索引上
+    FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
+
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        tableCacheModel.carbonTable.getAbsoluteTableIdentifier();
+    CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath());
+    List<String> invalidSegments = new ArrayList<>();
+    List<UpdateVO> invalidTimestampsList = new ArrayList<>();
+
+    // get all valid segments and set them into the configuration
+    SegmentUpdateStatusManager updateStatusManager =
+        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
+        segmentStatusManager.getValidAndInvalidSegments();
+
+    tableCacheModel.segments = segments.getValidSegments().toArray(new String[0]);
+    if (segments.getValidSegments().size() == 0) {
+      return new ArrayList<>(0);
+    }
+
+    // remove entry in the segment index if there are invalid segments
+    invalidSegments.addAll(segments.getInvalidSegments());
+    for (String invalidSegmentId : invalidSegments) {
+      invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
+    }
+    if (invalidSegments.size() > 0) {
+      List<TableSegmentUniqueIdentifier> invalidSegmentsIds =
+          new ArrayList<>(invalidSegments.size());
+      for (String segId : invalidSegments) {
+        invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segId));
+      }
+      cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
+    }
+
+    // get filter for segment
+    CarbonInputFormatUtil.processFilterExpression(filters, tableCacheModel.carbonTable);
+    FilterResolverIntf filterInterface = CarbonInputFormatUtil
+        .resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier());
+
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    //for each segment fetch blocks matching filter in Driver BTree
+    for (String segmentNo : tableCacheModel.segments) {
+      try {
+        List<DataRefNode> dataRefNodes =
+            getDataBlocksOfSegment(filterExpressionProcessor, absoluteTableIdentifier,
+                tableCacheModel.carbonTablePath, filterInterface, segmentNo, cacheClient,
+                updateStatusManager);
+        for (DataRefNode dataRefNode : dataRefNodes) {
+          BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
+          TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
+
+          if (CarbonUtil.isInvalidTableBlock(tableBlockInfo,
+              updateStatusManager.getInvalidTimestampRange(tableBlockInfo.getSegmentId()),
+              updateStatusManager)) {
+            continue;
+          }
+          result.add(new CarbonLocalInputSplit(segmentNo, tableBlockInfo.getFilePath(),
+              tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
+              Arrays.asList(tableBlockInfo.getLocations()),
+              tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
+              tableBlockInfo.getVersion().number()));
+        }
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+    cacheClient.close();
+    return result;
+  }
+
+  /**
+   * get data blocks of given segment
+   */
+  private List<DataRefNode> getDataBlocksOfSegment(
+      FilterExpressionProcessor filterExpressionProcessor,
+      AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath,
+      FilterResolverIntf resolver, String segmentId, CacheClient cacheClient,
+      SegmentUpdateStatusManager updateStatusManager) throws IOException {
+    //DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance();
+    //QueryStatistic statistic = new QueryStatistic();
+
+    //读取Segment 内部的Index
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
+        getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient,
+            updateStatusManager);
+
+    List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
+
+    if (null != segmentIndexMap) {
+      // build result
+      for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
+        List<DataRefNode> filterredBlocks;
+        // if no filter is given get all blocks from Btree Index
+        if (null == resolver) {
+          filterredBlocks = getDataBlocksOfIndex(abstractIndex);
+        } else {
+          //ignore filter
+          //filterredBlocks = getDataBlocksOfIndex(abstractIndex);
+
+          // apply filter and get matching blocks
+          filterredBlocks = filterExpressionProcessor
+              .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex,
+                  absoluteTableIdentifier);
+        }
+        resultFilterredBlocks.addAll(filterredBlocks);
+      }
+    }
+    //statistic.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+    //recorder.recordStatisticsForDriver(statistic, "123456"/*job.getConfiguration().get("query.id")*/);
+    return resultFilterredBlocks;
+  }
+
+  private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper,
+      UpdateVO updateDetails) {
+    if (null != updateDetails.getLatestUpdateTimestamp()
+        && updateDetails.getLatestUpdateTimestamp() > segmentTaskIndexWrapper
+        .getRefreshedTimeStamp()) {
+      return true;
+    }
+    return false;
+  }
+
+  private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(/*JobContext job,*/
+      AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath, String segmentId,
+      CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws IOException {
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
+    SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
+    boolean isSegmentUpdated = false;
+    Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
+    TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
+        new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
+    segmentTaskIndexWrapper =
+        cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
+    UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
+    if (null != segmentTaskIndexWrapper) {
+      segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+      if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
+        taskKeys = segmentIndexMap.keySet();
+        isSegmentUpdated = true;
+      }
+    }
+
+    // if segment tree is not loaded, load the segment tree
+    if (segmentIndexMap == null || isSegmentUpdated) {
+
+      List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
+      List<String> segs = new ArrayList<>();
+      segs.add(segmentId);
+
+      FileSystem fs =
+          getFileStatusOfSegments(new String[] { segmentId }, tablePath, fileStatusList);
+      List<InputSplit> splits = getSplit(fileStatusList, fs);
+
+      List<FileSplit> carbonSplits = new ArrayList<>();
+      for (InputSplit inputSplit : splits) {
+        FileSplit fileSplit = (FileSplit) inputSplit;
+        String segId = CarbonTablePath.DataPathUtil
+            .getSegmentId(fileSplit.getPath().toString());//这里的seperator应该怎么加??
+        if (segId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
+          continue;
+        }
+        carbonSplits.add(fileSplit);
+      }
+
+      List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
+      for (FileSplit inputSplit : carbonSplits) {
+        if (isValidBlockBasedOnUpdateDetails(taskKeys, inputSplit, updateDetails,
+            updateStatusManager, segmentId)) {
+
+          BlockletInfos blockletInfos = new BlockletInfos(0, 0,
+              0);//this level we do not need blocklet info!!!! Is this a trick?
+          tableBlockInfoList.add(
+              new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(), segmentId,
+                  inputSplit.getLocations(), inputSplit.getLength(), blockletInfos,
+                  ColumnarFormatVersion
+                      .valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION), null/*new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)*/));//这里的null是否会异常?
+        }
+      }
+
+      Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
+      segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
+      // get Btree blocks for given segment
+      tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
+      tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
+      segmentTaskIndexWrapper =
+          cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
+      segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+    }
+    return segmentIndexMap;
+  }
+
+  private boolean isValidBlockBasedOnUpdateDetails(
+      Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, FileSplit carbonInputSplit,
+      UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId) {
+    String taskID = null;
+    if (null != carbonInputSplit) {
+      if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) {
+        return false;
+      }
+
+      if (null == taskKeys) {
+        return true;
+      }
+
+      taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName());
+      String bucketNo =
+          CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName());
+
+      SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder =
+          new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo);
+
+      String blockTimestamp = carbonInputSplit.getPath().getName()
+          .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1,
+              carbonInputSplit.getPath().getName().lastIndexOf('.'));
+      if (!(updateDetails.getUpdateDeltaStartTimestamp() != null
+          && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) {
+        if (!taskKeys.contains(taskBucketHolder)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private List<InputSplit> getSplit(List<FileStatus> fileStatusList, FileSystem targetSystem)
+      throws IOException {
+
+    Iterator split = fileStatusList.iterator();
+
+    List<InputSplit> splits = new ArrayList<>();
+
+    while (true) {
+      while (true) {
+        while (split.hasNext()) {
+          FileStatus file = (FileStatus) split.next();
+          Path path = file.getPath();
+          long length = file.getLen();
+          if (length != 0L) {
+            BlockLocation[] blkLocations;
+            if (file instanceof LocatedFileStatus) {
+              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
+            } else {
+              blkLocations = targetSystem.getFileBlockLocations(file, 0L, length);
+            }
+
+            if (this.isSplitable()) {
+              long blockSize1 = file.getBlockSize();
+              long splitSize = this.computeSplitSize(blockSize1, 1, Long.MAX_VALUE);
+
+              long bytesRemaining;
+              int blkIndex;
+              for (
+                  bytesRemaining = length;
+                  (double) bytesRemaining / (double) splitSize > 1.1D;
+                  bytesRemaining -= splitSize) {
+                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
+                splits.add(this.makeSplit(path, length - bytesRemaining, splitSize,
+                    blkLocations[blkIndex].getHosts()));
+              }
+
+              if (bytesRemaining != 0L) {
+                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
+                splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining,
+                    blkLocations[blkIndex].getHosts()));
+              }
+            } else {
+              splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length,
+                  blkLocations[0].getHosts()));
+            }
+          } else {
+            splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length,
+                new String[0]));
+          }
+        }
+        return splits;
+      }
+    }
+
+  }
+
+  private String[] getValidPartitions() {
+    //TODO: has to Identify partitions by partition pruning
+    return new String[] { "0" };
+  }
+
+  private FileSystem getFileStatusOfSegments(String[] segmentsToConsider, CarbonTablePath tablePath,
+      List<FileStatus> result) throws IOException {
+    String[] partitionsToConsider = getValidPartitions();
+    if (partitionsToConsider.length == 0) {
+      throw new IOException("No partitions/data found");
+    }
+
+    FileSystem fs = null;
+
+    //PathFilter inputFilter = getDataFileFilter(job);
+
+    // get tokens for all the required FileSystem for table path
+        /*TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { tablePath },
+                job.getConfiguration());*/
+
+    //get all data files of valid partitions and segments
+    for (int i = 0; i < partitionsToConsider.length; ++i) {
+      String partition = partitionsToConsider[i];
+
+      for (int j = 0; j < segmentsToConsider.length; ++j) {
+        String segmentId = segmentsToConsider[j];
+        Path segmentPath = new Path(tablePath.getCarbonDataDirectoryPath(partition, segmentId));
+
+        try {
+          Configuration conf = new Configuration();
+          fs = segmentPath.getFileSystem(conf);
+          //fs.initialize(segmentPath.toUri(), conf);
+
+          RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath);
+          while (iter.hasNext()) {
+            LocatedFileStatus stat = iter.next();
+            //if(stat.getPath().toString().contains("carbondata"))//参看carbondata的carbonInputFilter的实现
+            if (DefaultFilter.accept(stat.getPath())) {
+              if (stat.isDirectory()) {
+                addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);
+              } else {
+                result.add(stat);
+              }
+            }
+          }
+        } catch (Exception ex) {
+          System.out.println(ex.toString());
+        }
+      }
+    }
+    return fs;
+  }
+
+  protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path,
+      PathFilter inputFilter) throws IOException {
+    RemoteIterator iter = fs.listLocatedStatus(path);
+
+    while (iter.hasNext()) {
+      LocatedFileStatus stat = (LocatedFileStatus) iter.next();
+      if (inputFilter.accept(stat.getPath())) {
+        if (stat.isDirectory()) {
+          this.addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+        } else {
+          result.add(stat);
+        }
+      }
+    }
+
+  }
+
+  /**
+   * get data blocks of given btree
+   */
+  private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
+    List<DataRefNode> blocks = new LinkedList<DataRefNode>();
+    SegmentProperties segmentProperties = abstractIndex.getSegmentProperties();
+
+    try {
+      IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
+      IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
+
+      // Add all blocks of btree into result
+      DataRefNodeFinder blockFinder =
+          new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize());
+      DataRefNode startBlock =
+          blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey);
+      DataRefNode endBlock =
+          blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey);
+      while (startBlock != endBlock) {
+        blocks.add(startBlock);
+        startBlock = startBlock.getNextDataRefNode();
+      }
+      blocks.add(endBlock);
+
+    } catch (KeyGenException e) {
+      System.out.println("Could not generate start key" + e.getMessage());
+    }
+    return blocks;
+  }
+
+  private boolean isSplitable() {
+    try {
+      // Don't split the file if it is local file system
+      if (this.fileType == FileFactory.FileType.LOCAL) {
+        return false;
+      }
+    } catch (Exception e) {
+      return true;
+    }
+    return true;
+  }
+
+  private long computeSplitSize(long blockSize, long minSize, long maxSize) {
+    return Math.max(minSize, Math.min(maxSize, blockSize));
+  }
+
+  private FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
+    return new FileSplit(file, start, length, hosts);
+  }
+
+  private int getBlockIndex(BlockLocation[] blkLocations, long offset) {
+    for (int i = 0; i < blkLocations.length; i++) {
+      // is the offset inside this block?
+      if ((blkLocations[i].getOffset() <= offset) && (offset
+          < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
+        return i;
+      }
+    }
+    BlockLocation last = blkLocations[blkLocations.length - 1];
+    long fileLength = last.getOffset() + last.getLength() - 1;
+    throw new IllegalArgumentException("Offset " + offset +
+        " is outside of file (0.." +
+        fileLength + ")");
+  }
+
+  /**
+   * get total number of rows. for count(*)
+   *
+   * @throws IOException
+   * @throws IndexBuilderException
+   */
+  public long getRowCount() throws IOException, IndexBuilderException {
+    long rowCount = 0;
+        /*AbsoluteTableIdentifier absoluteTableIdentifier = this.carbonTable.getAbsoluteTableIdentifier();
+
+        // no of core to load the blocks in driver
+        //addSegmentsIfEmpty(job, absoluteTableIdentifier);
+        int numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
+        try {
+            numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+                    .getProperty(CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT));
+        } catch (NumberFormatException e) {
+            numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
+        }
+        // creating a thread pool
+        ExecutorService threadPool = Executors.newFixedThreadPool(numberOfCores);
+        List<Future<Map<String, AbstractIndex>>> loadedBlocks =
+                new ArrayList<Future<Map<String, AbstractIndex>>>();
+        //for each segment fetch blocks matching filter in Driver BTree
+        for (String segmentNo : this.segmentList) {
+            // submitting the task
+            loadedBlocks
+                    .add(threadPool.submit(new BlocksLoaderThread(*//*job,*//* absoluteTableIdentifier, segmentNo)));
+        }
+        threadPool.shutdown();
+        try {
+            threadPool.awaitTermination(1, TimeUnit.HOURS);
+        } catch (InterruptedException e) {
+            throw new IndexBuilderException(e);
+        }
+        try {
+            // adding all the rows of the blocks to get the total row
+            // count
+            for (Future<Map<String, AbstractIndex>> block : loadedBlocks) {
+                for (AbstractIndex abstractIndex : block.get().values()) {
+                    rowCount += abstractIndex.getTotalNumberOfRows();
+                }
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IndexBuilderException(e);
+        }*/
+    return rowCount;
+  }
+}


[2/5] carbondata git commit: Fixed Carbondata-848

Posted by ch...@apache.org.
Fixed Carbondata-848

remove Presto module '


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4b6b6ef1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4b6b6ef1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4b6b6ef1

Branch: refs/heads/master
Commit: 4b6b6ef14b14999fa1face5e72abb26c3a752f81
Parents: 0e93a3f
Author: Bhavya <bh...@knoldus.com>
Authored: Tue Apr 4 14:19:49 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Thu May 25 11:28:43 2017 +0800

----------------------------------------------------------------------
 .../presto/CarbondataRecordSetProvider.java     | 277 +++++++++++++++++++
 1 file changed, 277 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b6b6ef1/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
new file mode 100755
index 0000000..63b926f
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
+import org.apache.carbondata.presto.impl.CarbonTableReader;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.*;
+import com.google.common.collect.ImmutableList;
+import io.airlift.slice.Slice;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.*;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.carbondata.presto.Types.checkType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
+
+  private final String connectorId;
+  private final CarbonTableReader carbonTableReader;
+
+  @Inject
+  public CarbondataRecordSetProvider(CarbondataConnectorId connectorId, CarbonTableReader reader) {
+    //this.config = requireNonNull(config, "config is null");
+    //this.connector = requireNonNull(connector, "connector is null");
+    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+    this.carbonTableReader = reader;
+  }
+
+  @Override public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle,
+      ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) {
+    requireNonNull(split, "split is null");
+    requireNonNull(columns, "columns is null");
+
+    // Convert split
+    CarbondataSplit cdSplit =
+        checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
+    checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
+
+    String targetCols = "";
+    // Convert all columns handles
+    ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder();
+    for (ColumnHandle handle : columns) {
+      handles.add(checkType(handle, CarbondataColumnHandle.class, "handle"));
+      targetCols += ((CarbondataColumnHandle) handle).getColumnName() + ",";
+    }
+
+    // Build column projection(check the column order)
+    if (targetCols.length() > 0) {
+      targetCols = targetCols.substring(0, targetCols.length() - 1);
+    }
+    else
+    {
+      targetCols = null;
+    }
+    //String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
+
+    CarbonTableCacheModel tableCacheModel =
+        carbonTableReader.getCarbonCache(cdSplit.getSchemaTableName());
+    checkNotNull(tableCacheModel, "tableCacheModel should not be null");
+    checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
+    checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null");
+
+    // Build Query Model
+    CarbonTable targetTable = tableCacheModel.carbonTable;
+    CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(targetTable, targetCols);
+    QueryModel queryModel =
+        QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable);
+
+    // Push down filter
+    fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
+
+    // Return new record set
+    return new CarbondataRecordSet(targetTable,/*connector,*/ session, /*config, */cdSplit,
+        handles.build(), queryModel);
+  }
+
+  // Build filter for QueryModel (copy from CarbonInputFormat=> createRecordReader)
+  private void fillFilter2QueryModel(QueryModel queryModel,
+      TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) {
+
+    //queryModel.setFilterExpressionResolverTree(new FilterResolverIntf());
+
+    //Build Predicate Expression
+    ImmutableList.Builder<Expression> filters = ImmutableList.builder();
+
+    Domain domain = null;
+
+    for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+
+      // Build ColumnExpresstion for Expresstion(Carbondata)
+      CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
+      Type type = cdch.getColumnType();
+
+      DataType coltype = Spi2CarbondataTypeMapper(type);
+      Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
+
+      domain = originalConstraint.getDomains().get().get(c);
+      checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+      if (domain.getValues().isNone()) {
+        //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
+        //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
+        //new Expression()
+      }
+
+      if (domain.getValues().isAll()) {
+        //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
+        //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
+      }
+
+      List<Object> singleValues = new ArrayList<>();
+      List<Expression> rangeFilter = new ArrayList<>();
+      for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
+        checkState(!range.isAll()); // Already checked
+        if (range.isSingleValue()) {
+          singleValues.add(range.getLow().getValue());
+        } else {
+          List<String> rangeConjuncts = new ArrayList<>();
+          if (!range.getLow().isLowerUnbounded()) {
+            Object value = ConvertDataByType(range.getLow().getValue(), type);
+            switch (range.getLow().getBound()) {
+              case ABOVE:
+                if (type == TimestampType.TIMESTAMP) {
+                  //todo not now
+                } else {
+                  GreaterThanExpression greater = new GreaterThanExpression(colExpression,
+                      new LiteralExpression(value, coltype));
+                  //greater.setRangeExpression(true);
+                  rangeFilter.add(greater);
+                }
+                break;
+              case EXACTLY:
+                GreaterThanEqualToExpression greater =
+                    new GreaterThanEqualToExpression(colExpression,
+                        new LiteralExpression(value, coltype));
+                //greater.setRangeExpression(true);
+                rangeFilter.add(greater);
+                break;
+              case BELOW:
+                throw new IllegalArgumentException("Low marker should never use BELOW bound");
+              default:
+                throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
+            }
+          }
+          if (!range.getHigh().isUpperUnbounded()) {
+            Object value = ConvertDataByType(range.getHigh().getValue(), type);
+            switch (range.getHigh().getBound()) {
+              case ABOVE:
+                throw new IllegalArgumentException("High marker should never use ABOVE bound");
+              case EXACTLY:
+                LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
+                    new LiteralExpression(value, coltype));
+                //less.setRangeExpression(true);
+                rangeFilter.add(less);
+                break;
+              case BELOW:
+                LessThanExpression less2 =
+                    new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
+                //less2.setRangeExpression(true);
+                rangeFilter.add(less2);
+                break;
+              default:
+                throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
+            }
+          }
+        }
+      }
+
+      if (singleValues.size() == 1) {
+        Expression ex = null;
+        if (coltype.equals(DataType.STRING)) {
+          ex = new EqualToExpression(colExpression,
+              new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+        } else ex = new EqualToExpression(colExpression,
+            new LiteralExpression(singleValues.get(0), coltype));
+        filters.add(ex);
+      } else if (singleValues.size() > 1) {
+        ListExpression candidates = null;
+        List<Expression> exs = singleValues.stream().map((a) -> {
+          return new LiteralExpression(ConvertDataByType(a, type), coltype);
+        }).collect(Collectors.toList());
+        candidates = new ListExpression(exs);
+
+        if (candidates != null) filters.add(new InExpression(colExpression, candidates));
+      } else if (rangeFilter.size() > 0) {
+        if (rangeFilter.size() > 1) {
+          Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
+          if (rangeFilter.size() > 2) {
+            for (int i = 2; i < rangeFilter.size(); i++) {
+              filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
+            }
+          }
+        } else if (rangeFilter.size() == 1) filters.add(rangeFilter.get(0));
+      }
+    }
+
+    Expression finalFilters;
+    List<Expression> tmp = filters.build();
+    if (tmp.size() > 1) {
+      finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
+      if (tmp.size() > 2) {
+        for (int i = 2; i < tmp.size(); i++) {
+          finalFilters = new AndExpression(finalFilters, tmp.get(i));
+        }
+      }
+    } else if (tmp.size() == 1) finalFilters = tmp.get(0);
+    else return;
+
+    // todo set into QueryModel
+    CarbonInputFormatUtil.processFilterExpression(finalFilters, carbonTable);
+    queryModel.setFilterExpressionResolverTree(
+        CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier()));
+  }
+
+  public static DataType Spi2CarbondataTypeMapper(Type colType) {
+    if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
+    else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
+    else if (colType == IntegerType.INTEGER) return DataType.INT;
+    else if (colType == BigintType.BIGINT) return DataType.LONG;
+    else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
+    else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
+    else if (colType == VarcharType.VARCHAR) return DataType.STRING;
+    else if (colType == DateType.DATE) return DataType.DATE;
+    else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+    else return DataType.STRING;
+  }
+
+  public Object ConvertDataByType(Object rawdata, Type type) {
+    if (type.equals(IntegerType.INTEGER)) return new Integer((rawdata.toString()));
+    else if (type.equals(BigintType.BIGINT)) return (Long) rawdata;
+    else if (type.equals(VarcharType.VARCHAR)) return ((Slice) rawdata).toStringUtf8();
+    else if (type.equals(BooleanType.BOOLEAN)) return (Boolean) (rawdata);
+
+    return rawdata;
+  }
+}


[3/5] carbondata git commit: [CARBONDATA-872] Fix comment issues of integration/presto for easier reading

Posted by ch...@apache.org.
[CARBONDATA-872] Fix comment issues of integration/presto for easier reading

fix comments

Add comments for some classes and functions

update annotation format

delete checkstyle directory


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4422c525
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4422c525
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4422c525

Branch: refs/heads/master
Commit: 4422c525623ecf49ce1395bb6069f73e48e42fa1
Parents: 4b6b6ef
Author: ffpeng90 <ff...@126.com>
Authored: Wed Apr 5 23:00:50 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Thu May 25 11:30:15 2017 +0800

----------------------------------------------------------------------
 .../presto/CarbondataColumnConstraint.java      |  86 ++++++
 .../presto/CarbondataColumnHandle.java          |   6 -
 .../presto/CarbondataConnectorFactory.java      |  94 +++++++
 .../carbondata/presto/CarbondataMetadata.java   |   7 -
 .../presto/CarbondataRecordCursor.java          | 158 +++++++++++
 .../carbondata/presto/CarbondataRecordSet.java  | 110 ++++++++
 .../presto/CarbondataRecordSetProvider.java     |  14 +-
 .../presto/CarbondataSplitManager.java          | 279 +++++++++++++++++++
 .../presto/impl/CarbonTableCacheModel.java      |  44 +++
 .../presto/impl/CarbonTableConfig.java          |   3 +
 .../presto/impl/CarbonTableReader.java          |  40 ++-
 11 files changed, 793 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4422c525/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
new file mode 100755
index 0000000..82c7c78
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.predicate.Domain;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSetter;
+
+import java.util.Objects;
+import java.util.Optional;
+
+//import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Objects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Encapsulating presto Tuple-domain
+ */
+public class CarbondataColumnConstraint {
+  private final String name;
+  private final boolean invertedindexed;
+  private Optional<Domain> domain;
+
+  @JsonCreator public CarbondataColumnConstraint(@JsonProperty("name") String name,
+      @JsonProperty("domain") Optional<Domain> domain,
+      @JsonProperty("invertedindexed") boolean invertedindexed) {
+    this.name = requireNonNull(name, "name is null");
+    this.invertedindexed = requireNonNull(invertedindexed, "invertedIndexed is null");
+    this.domain = requireNonNull(domain, "domain is null");
+  }
+
+  @JsonProperty public boolean isInvertedindexed() {
+    return invertedindexed;
+  }
+
+  @JsonProperty public String getName() {
+    return name;
+  }
+
+  @JsonProperty public Optional<Domain> getDomain() {
+    return domain;
+  }
+
+  @JsonSetter public void setDomain(Optional<Domain> domain) {
+    this.domain = domain;
+  }
+
+  @Override public int hashCode() {
+    return Objects.hash(name, domain, invertedindexed);
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
+    if ((obj == null) || (getClass() != obj.getClass())) {
+      return false;
+    }
+
+    CarbondataColumnConstraint other = (CarbondataColumnConstraint) obj;
+    return Objects.equals(this.name, other.name) && Objects.equals(this.domain, other.domain)
+        && Objects.equals(this.invertedindexed, other.invertedindexed);
+  }
+
+  @Override public String toString() {
+    return toStringHelper(this).add("name", this.name).add("invertedindexed", this.invertedindexed)
+        .add("domain", this.domain).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4422c525/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
index cc10165..b9152b5 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
@@ -68,12 +68,6 @@ public class CarbondataColumnHandle implements ColumnHandle {
     return columnUniqueId;
   }
 
-  /**
-   * ordinalPosition of a columnhandle is the -> number of the column in the entire list of columns of this table
-   * IT DOESNT DEPEND ON THE QUERY (select clm3, clm0, clm1  from tablename)
-   * The columnhandle of clm3 : has ordinalposition = 3
-   */
-
   @JsonCreator public CarbondataColumnHandle(@JsonProperty("connectorId") String connectorId,
       @JsonProperty("columnName") String columnName, @JsonProperty("columnType") Type columnType,
       @JsonProperty("ordinalPosition") int ordinalPosition,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4422c525/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
new file mode 100755
index 0000000..66c007d
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
+import com.google.common.base.Throwables;
+import com.google.inject.Injector;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.json.JsonModule;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Build Carbondata Connector
+ * It will be called by CarbondataPlugin
+ */
+public class CarbondataConnectorFactory implements ConnectorFactory {
+
+  private final String name;
+  private final ClassLoader classLoader;
+
+  public CarbondataConnectorFactory(String connectorName, ClassLoader classLoader) {
+    this.name = connectorName;
+    this.classLoader = requireNonNull(classLoader, "classLoader is null");
+  }
+
+  @Override public String getName() {
+    return name;
+  }
+
+  @Override public ConnectorHandleResolver getHandleResolver() {
+    return new CarbondataHandleResolver();
+  }
+
+  @Override public Connector create(String connectorId, Map<String, String> config,
+      ConnectorContext context) {
+    requireNonNull(config, "config is null");
+
+    try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+      Bootstrap app = new Bootstrap(new JsonModule(),
+          new CarbondataModule(connectorId, context.getTypeManager()));
+
+      Injector injector =
+          app.strictConfig().doNotInitializeLogging().setRequiredConfigurationProperties(config)
+              .initialize();
+
+      LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
+      CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
+      //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
+      ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
+      ConnectorRecordSetProvider connectorRecordSet =
+          injector.getInstance(ConnectorRecordSetProvider.class);
+      //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class);
+
+      //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
+      //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);
+      //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
+      //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
+      //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
+
+      return new CarbondataConnector(lifeCycleManager, metadata,
+          new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
+          //new ClassLoaderSafeConnectorRecordSetProvider(, classLoader),
+          classLoader
+          //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
+          //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
+          //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
+      );
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4422c525/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index d2c5ab6..d938a3d 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -69,13 +69,6 @@ public class CarbondataMetadata implements ConnectorMetadata {
   @Override
   public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) {
 
-        /*List<SchemaTableName> all = carbonTableReader.getTableList();
-        if(schemaNameOrNull != null)
-        {
-            return all.stream().filter(a -> schemaNameOrNull.equals(a.getSchemaName())).collect(Collectors.toList());
-        }
-        return all;*/
-
     List<String> schemaNames;
     if (schemaNameOrNull != null) {
       schemaNames = ImmutableList.of(schemaNameOrNull);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4422c525/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
new file mode 100755
index 0000000..ad47f75
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.type.Type;
+import com.google.common.base.Strings;
+import io.airlift.log.Logger;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
+import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
+import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+public class CarbondataRecordCursor implements RecordCursor {
+
+  private static final Logger log = Logger.get(CarbondataRecordCursor.class);
+  private final List<CarbondataColumnHandle> columnHandles;
+
+  private List<String> fields;
+  private CarbondataSplit split;
+  private CarbonIterator<Object[]> rowCursor;
+  private CarbonReadSupport<Object[]> readSupport;
+
+  private long totalBytes;
+  private long nanoStart;
+  private long nanoEnd;
+
+  public CarbondataRecordCursor(CarbonReadSupport<Object[]> readSupport,
+      CarbonIterator<Object[]> carbonIterator, List<CarbondataColumnHandle> columnHandles,
+      CarbondataSplit split) {
+    this.rowCursor = carbonIterator;
+    this.columnHandles = columnHandles;
+    this.readSupport = readSupport;
+    this.totalBytes = 0;
+  }
+
+  @Override public long getTotalBytes() {
+    return totalBytes;
+  }
+
+  @Override public long getCompletedBytes() {
+    return totalBytes;
+  }
+
+  @Override public long getReadTimeNanos() {
+    return nanoStart > 0L ? (nanoEnd == 0 ? System.nanoTime() : nanoEnd) - nanoStart : 0L;
+  }
+
+  @Override public Type getType(int field) {
+
+    checkArgument(field < columnHandles.size(), "Invalid field index");
+    return columnHandles.get(field).getColumnType();
+  }
+
+  /**
+   * get next Row/Page
+   */
+  @Override public boolean advanceNextPosition() {
+
+    if (nanoStart == 0) {
+      nanoStart = System.nanoTime();
+    }
+
+    if (rowCursor.hasNext()) {
+      Object[] columns = readSupport.readRow(rowCursor.next());
+      fields = new ArrayList<String>();
+      if(columns != null && columns.length > 0)
+      {
+        for(Object value : columns){
+          if(value != null )
+          {
+            fields.add(value.toString());
+          } else {
+            fields.add(null);
+          }
+        }
+      }
+      totalBytes += columns.length;
+      return true;
+    }
+    return false;
+  }
+
+  @Override public boolean getBoolean(int field) {
+    checkFieldType(field, BOOLEAN);
+    return Boolean.parseBoolean(getFieldValue(field));
+  }
+
+  @Override public long getLong(int field) {
+    String timeStr = getFieldValue(field);
+    Long milliSec = 0L;
+
+    //suppose the
+    return Math.round(Double.parseDouble(getFieldValue(field)));
+  }
+
+  @Override public double getDouble(int field) {
+    checkFieldType(field, DOUBLE);
+    return Double.parseDouble(getFieldValue(field));
+  }
+
+  @Override public Slice getSlice(int field) {
+    checkFieldType(field, VARCHAR);
+    return Slices.utf8Slice(getFieldValue(field));
+  }
+
+  @Override public Object getObject(int field) {
+    return null;
+  }
+
+  @Override public boolean isNull(int field) {
+    checkArgument(field < columnHandles.size(), "Invalid field index");
+    return Strings.isNullOrEmpty(getFieldValue(field));
+  }
+
+  String getFieldValue(int field) {
+    checkState(fields != null, "Cursor has not been advanced yet");
+    return fields.get(field);
+  }
+
+  private void checkFieldType(int field, Type expected) {
+    Type actual = getType(field);
+    checkArgument(actual.equals(expected), "Expected field %s to be type %s but is %s", field,
+        expected, actual);
+  }
+
+  @Override public void close() {
+    nanoEnd = System.nanoTime();
+
+    //todo  delete cache from readSupport
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4422c525/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
new file mode 100755
index 0000000..d3fd7a0
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.Type;
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
+//import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.carbondata.presto.Types.checkType;
+
+public class CarbondataRecordSet implements RecordSet {
+
+  private CarbonTable carbonTable;
+  private TupleDomain<ColumnHandle> originalConstraint;
+  private Expression carbonConstraint;
+  private List<CarbondataColumnConstraint> rebuildConstraints;
+  private QueryModel queryModel;
+  private CarbondataSplit split;
+  private List<CarbondataColumnHandle> columns;
+  private QueryExecutor queryExecutor;
+
+  private CarbonReadSupport<Object[]> readSupport;
+
+  public CarbondataRecordSet(CarbonTable carbonTable, ConnectorSession session,
+      ConnectorSplit split, List<CarbondataColumnHandle> columns, QueryModel queryModel) {
+    this.carbonTable = carbonTable;
+    this.split = checkType(split, CarbondataSplit.class, "connectorSplit");
+    this.originalConstraint = this.split.getConstraints();
+    this.rebuildConstraints = this.split.getRebuildConstraints();
+    this.queryModel = queryModel;
+    this.columns = columns;
+    this.readSupport = new DictionaryDecodeReadSupport();
+  }
+
+  //todo support later
+  private Expression parseConstraint2Expression(TupleDomain<ColumnHandle> constraints) {
+    return null;
+  }
+
+  @Override public List<Type> getColumnTypes() {
+    return columns.stream().map(a -> a.getColumnType()).collect(Collectors.toList());
+  }
+
+  /**
+   * get data blocks via Carbondata QueryModel API
+   */
+  @Override public RecordCursor cursor() {
+    List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
+
+    tableBlockInfoList.add(new TableBlockInfo(split.getLocalInputSplit().getPath().toString(),
+        split.getLocalInputSplit().getStart(), split.getLocalInputSplit().getSegmentId(),
+        split.getLocalInputSplit().getLocations().toArray(new String[0]),
+        split.getLocalInputSplit().getLength(),
+        //blockletInfos,
+        ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion())));
+    queryModel.setTableBlockInfos(tableBlockInfoList);
+
+    queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+
+    //queryModel.setQueryId(queryModel.getQueryId() + "_" + split.getLocalInputSplit().getSegmentId());
+    try {
+      readSupport
+          .initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
+      CarbonIterator<Object[]> carbonIterator =
+          new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
+      RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns, split);
+      return rc;
+    } catch (QueryExecutionException e) {
+      //throw new InterruptedException(e.getMessage());
+      System.out.println(e.getMessage());
+    } catch (Exception ex) {
+      System.out.println(ex.toString());
+    }
+    return null;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4422c525/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index 63b926f..8b087df 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -72,7 +72,6 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
     requireNonNull(split, "split is null");
     requireNonNull(columns, "columns is null");
 
-    // Convert split
     CarbondataSplit cdSplit =
         checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
     checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
@@ -111,11 +110,11 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
     fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
 
     // Return new record set
-    return new CarbondataRecordSet(targetTable,/*connector,*/ session, /*config, */cdSplit,
+    return new CarbondataRecordSet(targetTable, session, cdSplit,
         handles.build(), queryModel);
   }
 
-  // Build filter for QueryModel (copy from CarbonInputFormat=> createRecordReader)
+  // Build filter for QueryModel
   private void fillFilter2QueryModel(QueryModel queryModel,
       TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) {
 
@@ -139,14 +138,9 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
       checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
 
       if (domain.getValues().isNone()) {
-        //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
-        //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
-        //new Expression()
       }
 
       if (domain.getValues().isAll()) {
-        //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
-        //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
       }
 
       List<Object> singleValues = new ArrayList<>();
@@ -166,7 +160,6 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
                 } else {
                   GreaterThanExpression greater = new GreaterThanExpression(colExpression,
                       new LiteralExpression(value, coltype));
-                  //greater.setRangeExpression(true);
                   rangeFilter.add(greater);
                 }
                 break;
@@ -174,7 +167,6 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
                 GreaterThanEqualToExpression greater =
                     new GreaterThanEqualToExpression(colExpression,
                         new LiteralExpression(value, coltype));
-                //greater.setRangeExpression(true);
                 rangeFilter.add(greater);
                 break;
               case BELOW:
@@ -191,13 +183,11 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
               case EXACTLY:
                 LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
                     new LiteralExpression(value, coltype));
-                //less.setRangeExpression(true);
                 rangeFilter.add(less);
                 break;
               case BELOW:
                 LessThanExpression less2 =
                     new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
-                //less2.setRangeExpression(true);
                 rangeFilter.add(less2);
                 break;
               default:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4422c525/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
new file mode 100755
index 0000000..f3efb36
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import org.apache.carbondata.presto.impl.CarbonLocalInputSplit;
+import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
+import org.apache.carbondata.presto.impl.CarbonTableReader;
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.*;
+import com.google.common.collect.ImmutableList;
+import io.airlift.slice.Slice;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.*;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.carbondata.presto.Types.checkType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Build Carbontable splits
+ * filtering irrelevant blocks
+ */
+public class CarbondataSplitManager implements ConnectorSplitManager {
+
+  private final String connectorId;
+  private final CarbonTableReader carbonTableReader;
+
+  @Inject
+  public CarbondataSplitManager(CarbondataConnectorId connectorId, CarbonTableReader reader) {
+    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+    this.carbonTableReader = requireNonNull(reader, "client is null");
+  }
+
+  public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle,
+      ConnectorSession session, ConnectorTableLayoutHandle layout) {
+    CarbondataTableLayoutHandle layoutHandle = (CarbondataTableLayoutHandle) layout;
+    CarbondataTableHandle tableHandle = layoutHandle.getTable();
+    SchemaTableName key = tableHandle.getSchemaTableName();
+
+    // Packaging presto-TupleDomain into CarbondataColumnConstraint, to decouple from presto-spi Module
+    List<CarbondataColumnConstraint> rebuildConstraints =
+        getColumnConstraints(layoutHandle.getConstraint());
+
+    CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key);
+    Expression filters = parseFilterExpression(layoutHandle.getConstraint(), cache.carbonTable);
+
+    if (cache != null) {
+      try {
+        List<CarbonLocalInputSplit> splits = carbonTableReader.getInputSplits2(cache, filters);
+
+        ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
+        for (CarbonLocalInputSplit split : splits) {
+          cSplits.add(new CarbondataSplit(connectorId, tableHandle.getSchemaTableName(),
+              layoutHandle.getConstraint(), split, rebuildConstraints));
+        }
+        return new FixedSplitSource(cSplits.build());
+      } catch (Exception ex) {
+        System.out.println(ex.toString());
+      }
+    }
+    return null;
+  }
+
+  public List<CarbondataColumnConstraint> getColumnConstraints(
+      TupleDomain<ColumnHandle> constraint) {
+    ImmutableList.Builder<CarbondataColumnConstraint> constraintBuilder = ImmutableList.builder();
+    for (TupleDomain.ColumnDomain<ColumnHandle> columnDomain : constraint.getColumnDomains()
+        .get()) {
+      CarbondataColumnHandle columnHandle =
+          checkType(columnDomain.getColumn(), CarbondataColumnHandle.class, "column handle");
+
+      constraintBuilder.add(new CarbondataColumnConstraint(columnHandle.getColumnName(),
+          Optional.of(columnDomain.getDomain()), columnHandle.isInvertedIndex()));
+    }
+
+    return constraintBuilder.build();
+  }
+
+  /**
+   * Convert presto-TupleDomain predication into Carbon scan express condition
+   * @param originalConstraint  presto-TupleDomain
+   * @param carbonTable
+   * @return
+   */
+  public Expression parseFilterExpression(TupleDomain<ColumnHandle> originalConstraint,
+      CarbonTable carbonTable) {
+    ImmutableList.Builder<Expression> filters = ImmutableList.builder();
+
+    Domain domain = null;
+
+    for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+
+      CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
+      Type type = cdch.getColumnType();
+
+      List<CarbonColumn> ccols = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName());
+      Optional<CarbonColumn> target =
+          ccols.stream().filter(a -> a.getColName().equals(cdch.getColumnName())).findFirst();
+
+      if (target.get() == null) return null;
+
+      DataType coltype = target.get().getDataType();
+      ColumnExpression colExpression =
+          new ColumnExpression(cdch.getColumnName(), target.get().getDataType());
+      //colExpression.setColIndex(cs.getSchemaOrdinal());
+      colExpression.setDimension(target.get().isDimesion());
+      colExpression.setDimension(
+          carbonTable.getDimensionByName(carbonTable.getFactTableName(), cdch.getColumnName()));
+      colExpression.setCarbonColumn(target.get());
+
+      domain = originalConstraint.getDomains().get().get(c);
+      checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+      if (domain.getValues().isNone()) {
+      }
+
+      if (domain.getValues().isAll()) {
+      }
+
+      List<Object> singleValues = new ArrayList<>();
+      List<Expression> rangeFilter = new ArrayList<>();
+      for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
+        checkState(!range.isAll()); // Already checked
+        if (range.isSingleValue()) {
+          singleValues.add(range.getLow().getValue());
+        } else {
+          List<String> rangeConjuncts = new ArrayList<>();
+          if (!range.getLow().isLowerUnbounded()) {
+            Object value = ConvertDataByType(range.getLow().getValue(), type);
+            switch (range.getLow().getBound()) {
+              case ABOVE:
+                if (type == TimestampType.TIMESTAMP) {
+                  //todo not now
+                } else {
+                  GreaterThanExpression greater = new GreaterThanExpression(colExpression,
+                      new LiteralExpression(value, coltype));
+                  rangeFilter.add(greater);
+                }
+                break;
+              case EXACTLY:
+                GreaterThanEqualToExpression greater =
+                    new GreaterThanEqualToExpression(colExpression,
+                        new LiteralExpression(value, coltype));
+                rangeFilter.add(greater);
+                break;
+              case BELOW:
+                throw new IllegalArgumentException("Low marker should never use BELOW bound");
+              default:
+                throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
+            }
+          }
+          if (!range.getHigh().isUpperUnbounded()) {
+            Object value = ConvertDataByType(range.getHigh().getValue(), type);
+            switch (range.getHigh().getBound()) {
+              case ABOVE:
+                throw new IllegalArgumentException("High marker should never use ABOVE bound");
+              case EXACTLY:
+                LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
+                    new LiteralExpression(value, coltype));
+                rangeFilter.add(less);
+                break;
+              case BELOW:
+                LessThanExpression less2 =
+                    new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
+                rangeFilter.add(less2);
+                break;
+              default:
+                throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
+            }
+          }
+        }
+      }
+
+      if (singleValues.size() == 1) {
+        Expression ex = null;
+        if (coltype.equals(DataType.STRING)) {
+          ex = new EqualToExpression(colExpression,
+              new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+        } else ex = new EqualToExpression(colExpression,
+            new LiteralExpression(singleValues.get(0), coltype));
+        filters.add(ex);
+      } else if (singleValues.size() > 1) {
+        ListExpression candidates = null;
+        List<Expression> exs = singleValues.stream().map((a) -> {
+          return new LiteralExpression(ConvertDataByType(a, type), coltype);
+        }).collect(Collectors.toList());
+        candidates = new ListExpression(exs);
+
+        if (candidates != null) filters.add(new InExpression(colExpression, candidates));
+      } else if (rangeFilter.size() > 0) {
+        if (rangeFilter.size() > 1) {
+          Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
+          if (rangeFilter.size() > 2) {
+            for (int i = 2; i < rangeFilter.size(); i++) {
+              filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
+            }
+          }
+        } else if (rangeFilter.size() == 1)//only have one value
+          filters.add(rangeFilter.get(0));
+      }
+    }
+
+    Expression finalFilters;
+    List<Expression> tmp = filters.build();
+    if (tmp.size() > 1) {
+      finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
+      if (tmp.size() > 2) {
+        for (int i = 2; i < tmp.size(); i++) {
+          finalFilters = new AndExpression(finalFilters, tmp.get(i));
+        }
+      }
+    } else if (tmp.size() == 1) finalFilters = tmp.get(0);
+    else//no filter
+      return null;
+
+    return finalFilters;
+  }
+
+  /**
+   * Convert presto spi Type into Carbondata Type
+   * @param colType
+   * @return
+   */
+  public static DataType Spi2CarbondataTypeMapper(Type colType) {
+    if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
+    else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
+    else if (colType == IntegerType.INTEGER) return DataType.INT;
+    else if (colType == BigintType.BIGINT) return DataType.LONG;
+    else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
+    else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
+    else if (colType == VarcharType.VARCHAR) return DataType.STRING;
+    else if (colType == DateType.DATE) return DataType.DATE;
+    else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+    else return DataType.STRING;
+  }
+
+  public Object ConvertDataByType(Object rawdata, Type type) {
+    if (type.equals(IntegerType.INTEGER)) return new Integer((rawdata.toString()));
+    else if (type.equals(BigintType.BIGINT)) return (Long) rawdata;
+    else if (type.equals(VarcharType.VARCHAR)) return ((Slice) rawdata).toStringUtf8();
+    else if (type.equals(BooleanType.BOOLEAN)) return (Boolean) (rawdata);
+
+    return rawdata;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4422c525/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
new file mode 100755
index 0000000..45755d1
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * Caching Carbon meta(e.g. TableIdentifier, TablePath, TableInfo, CarbonTable) in Class CarbonTableReader
+ * to speed up query
+ */
+public class CarbonTableCacheModel {
+
+  public CarbonTableIdentifier carbonTableIdentifier;
+  public CarbonTablePath carbonTablePath;
+
+  public TableInfo tableInfo;
+  public CarbonTable carbonTable;
+  public String[] segments;
+
+  public boolean isValid() {
+    if (carbonTable != null && carbonTablePath != null && carbonTableIdentifier != null)
+      return true;
+    else return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4422c525/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
index a0ef63f..677cefd 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
@@ -21,6 +21,9 @@ import io.airlift.configuration.Config;
 
 import javax.validation.constraints.NotNull;
 
+/**
+ * Configuration read from etc/catalog/carbondata.properties
+ */
 public class CarbonTableConfig {
 
   //read from config

http://git-wip-us.apache.org/repos/asf/carbondata/blob/4422c525/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 14ecfbc..bb482b0 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -71,22 +71,21 @@ import java.util.stream.Stream;
 
 import static java.util.Objects.requireNonNull;
 
+/** CarbonTableReader will be a facade of these utils
+ *
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ */
 public class CarbonTableReader {
 
-  /** CarbonTableReader will be a facade of these utils
-   *
-   * 1:CarbonMetadata,(logic table)
-   * 2:FileFactory, (physic table file)
-   * 3:CarbonCommonFactory, (offer some )
-   * 4:DictionaryFactory, (parse dictionary util)
-   */
-
   private CarbonTableConfig config;
   private List<SchemaTableName> tableList;
   private CarbonFile dbStore;
   private FileFactory.FileType fileType;
 
-  //as a cache for Carbon reader
+  // A cache for Carbon reader
   private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;
 
   @Inject public CarbonTableReader(CarbonTableConfig config) {
@@ -94,7 +93,7 @@ public class CarbonTableReader {
     this.cc = new ConcurrentHashMap<>();
   }
 
-  //for worker node to initialize carbon metastore
+  // for worker node to initialize carbon metastore
   public CarbonTableCacheModel getCarbonCache(SchemaTableName table) {
     if (!cc.containsKey(table)) {
       try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
@@ -120,7 +119,7 @@ public class CarbonTableReader {
     return updateSchemaList();
   }
 
-  //default PathFilter
+  // default PathFilter
   private static final PathFilter DefaultFilter = new PathFilter() {
     @Override public boolean accept(Path path) {
       return CarbonTablePath.isCarbonDataFile(path.getName());
@@ -177,7 +176,7 @@ public class CarbonTableReader {
   }
 
   public void updateSchemaTables() {
-    //update logic determine later
+    // update logic determine later
     if (dbStore == null) {
       updateSchemaList();
     }
@@ -232,14 +231,14 @@ public class CarbonTableReader {
           (org.apache.carbondata.format.TableInfo) thriftReader.read();
       thriftReader.close();
 
-      //Format Level TableInfo, need transfer to Code Level TableInfo
+      // Step3: Transform Format Level TableInfo to Code Level TableInfo
       SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
       TableInfo wrapperTableInfo = schemaConverter
           .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
               storePath);
       wrapperTableInfo.setMetaDataFilepath(
           CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
-      //load metadata info into CarbonMetadata
+      // Step4: Load metadata info into CarbonMetadata
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
 
       cache.tableInfo = wrapperTableInfo;
@@ -256,7 +255,7 @@ public class CarbonTableReader {
   public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
       Expression filters) throws Exception {
 
-    //处理filter, 下推filter,将应用在Segment的索引上
+    // need apply filters to segment
     FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
 
     AbsoluteTableIdentifier absoluteTableIdentifier =
@@ -297,7 +296,7 @@ public class CarbonTableReader {
         .resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier());
 
     List<CarbonLocalInputSplit> result = new ArrayList<>();
-    //for each segment fetch blocks matching filter in Driver BTree
+    // for each segment fetch blocks matching filter in Driver BTree
     for (String segmentNo : tableCacheModel.segments) {
       try {
         List<DataRefNode> dataRefNodes =
@@ -338,7 +337,7 @@ public class CarbonTableReader {
     //DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance();
     //QueryStatistic statistic = new QueryStatistic();
 
-    //读取Segment 内部的Index
+    // read segment index
     Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
         getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient,
             updateStatusManager);
@@ -349,13 +348,10 @@ public class CarbonTableReader {
       // build result
       for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
         List<DataRefNode> filterredBlocks;
-        // if no filter is given get all blocks from Btree Index
+        // if no filter is given, get all blocks from Btree Index
         if (null == resolver) {
           filterredBlocks = getDataBlocksOfIndex(abstractIndex);
         } else {
-          //ignore filter
-          //filterredBlocks = getDataBlocksOfIndex(abstractIndex);
-
           // apply filter and get matching blocks
           filterredBlocks = filterExpressionProcessor
               .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex,
@@ -568,12 +564,10 @@ public class CarbonTableReader {
         try {
           Configuration conf = new Configuration();
           fs = segmentPath.getFileSystem(conf);
-          //fs.initialize(segmentPath.toUri(), conf);
 
           RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath);
           while (iter.hasNext()) {
             LocatedFileStatus stat = iter.next();
-            //if(stat.getPath().toString().contains("carbondata"))//参看carbondata的carbonInputFilter的实现
             if (DefaultFilter.accept(stat.getPath())) {
               if (stat.isDirectory()) {
                 addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);


[5/5] carbondata git commit: Added support for DecimalType and Timestamp in Presto Integration with Spark2.1

Posted by ch...@apache.org.
Added support for DecimalType and Timestamp in Presto Integration with Spark2.1

Added support for DecimalType and Fixed Date and time issues

Fixed the Apache License and removed comments

Corrected indentation and spaces


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9669c0b2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9669c0b2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9669c0b2

Branch: refs/heads/master
Commit: 9669c0b291a2fd00597aa7316d96c3a8fc55d4ad
Parents: 7c0e660
Author: Bhavya <bh...@knoldus.com>
Authored: Mon Apr 17 11:51:44 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Thu May 25 11:30:51 2017 +0800

----------------------------------------------------------------------
 .../presto/CarbondataColumnHandle.java          |  25 ++-
 .../carbondata/presto/CarbondataConnector.java  |  83 +++++++++
 .../presto/CarbondataConnectorFactory.java      |   5 +-
 .../carbondata/presto/CarbondataMetadata.java   |  23 +--
 .../carbondata/presto/CarbondataModule.java     |  81 +++++++++
 .../carbondata/presto/CarbondataPageSource.java | 178 +++++++++++++++++++
 .../presto/CarbondataPageSourceProvider.java    |  50 ++++++
 .../presto/CarbondataRecordCursor.java          |  60 ++++++-
 .../carbondata/presto/CarbondataRecordSet.java  |   8 +-
 .../presto/CarbondataRecordSetProvider.java     |  12 +-
 10 files changed, 499 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9669c0b2/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
index b9152b5..4a9b7ed 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
@@ -46,6 +46,14 @@ public class CarbondataColumnHandle implements ColumnHandle {
   private final String columnUniqueId;
   private final boolean isInvertedIndex;
 
+  /**
+   * Used when this column contains decimal data.
+   */
+  private int scale;
+
+  private int precision;
+
+
   public boolean isMeasure() {
     return isMeasure;
   }
@@ -76,7 +84,9 @@ public class CarbondataColumnHandle implements ColumnHandle {
       @JsonProperty("isMeasure") boolean isMeasure,
       @JsonProperty("columnGroupId") int columnGroupId,
       @JsonProperty("columnUniqueId") String columnUniqueId,
-      @JsonProperty("isInvertedIndex") boolean isInvertedIndex) {
+      @JsonProperty("isInvertedIndex") boolean isInvertedIndex,
+      @JsonProperty("precision") int precision,
+      @JsonProperty("scale") int scale) {
     this.connectorId = requireNonNull(connectorId, "connectorId is null");
     this.columnName = requireNonNull(columnName, "columnName is null");
     this.columnType = requireNonNull(columnType, "columnType is null");
@@ -89,6 +99,8 @@ public class CarbondataColumnHandle implements ColumnHandle {
     this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null");
     this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
     this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
+    this.precision = precision;
+    this.scale = scale;
   }
 
   @JsonProperty public String getConnectorId() {
@@ -132,4 +144,15 @@ public class CarbondataColumnHandle implements ColumnHandle {
     return toStringHelper(this).add("connectorId", connectorId).add("columnName", columnName)
         .add("columnType", columnType).add("ordinalPosition", ordinalPosition).toString();
   }
+
+  @JsonProperty public int getScale() {
+    return scale;
+  }
+
+  @JsonProperty public int getPrecision() {
+    return precision;
+  }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9669c0b2/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
new file mode 100755
index 0000000..406ed93
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.transaction.IsolationLevel;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.log.Logger;
+
+import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataConnector implements Connector {
+
+  private static final Logger log = Logger.get(CarbondataConnector.class);
+
+  private final LifeCycleManager lifeCycleManager;
+  private final CarbondataMetadata metadata;
+  private final ConnectorSplitManager splitManager;
+  private final ConnectorRecordSetProvider recordSetProvider;
+  private final ClassLoader classLoader;
+  private final ConnectorPageSourceProvider pageSourceProvider;
+
+  public CarbondataConnector(LifeCycleManager lifeCycleManager, CarbondataMetadata metadata,
+      ConnectorSplitManager splitManager, ConnectorRecordSetProvider recordSetProvider,
+      ClassLoader classLoader, ConnectorPageSourceProvider pageSourceProvider) {
+    this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+    this.metadata = requireNonNull(metadata, "metadata is null");
+    this.splitManager = requireNonNull(splitManager, "splitManager is null");
+    this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+    this.classLoader = requireNonNull(classLoader, "classLoader is null");
+    this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
+  }
+
+  @Override public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel,
+      boolean readOnly) {
+    checkConnectorSupports(READ_COMMITTED, isolationLevel);
+    return CarbondataTransactionHandle.INSTANCE;
+  }
+
+  @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) {
+    metadata.putClassLoader(classLoader);
+    return metadata;
+  }
+
+  @Override public ConnectorSplitManager getSplitManager() {
+    return splitManager;
+  }
+
+  @Override public ConnectorRecordSetProvider getRecordSetProvider() {
+    return recordSetProvider;
+  }
+
+  @Override
+  public ConnectorPageSourceProvider getPageSourceProvider()
+  {
+    return pageSourceProvider;
+  }
+
+  @Override public final void shutdown() {
+    try {
+      lifeCycleManager.stop();
+    } catch (Exception e) {
+      log.error(e, "Error shutting down connector");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9669c0b2/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index d1c8082..d97f19e 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.presto;
 import com.facebook.presto.spi.ConnectorHandleResolver;
 import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
 import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
 import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
 import com.google.common.base.Throwables;
 import com.google.inject.Injector;
@@ -70,10 +71,12 @@ public class CarbondataConnectorFactory implements ConnectorFactory {
       ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
       ConnectorRecordSetProvider connectorRecordSet =
           injector.getInstance(ConnectorRecordSetProvider.class);
+       ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
 
       return new CarbondataConnector(lifeCycleManager, metadata,
           new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
-          classLoader
+          classLoader,
+          new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader)
       );
     } catch (Exception e) {
       throw Throwables.propagate(e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9669c0b2/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index f2d594a..7701490 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -123,9 +123,8 @@ public class CarbondataMetadata implements ConnectorMetadata {
     List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(schemaTableName.getTableName());
     for (CarbonColumn col : carbonColumns) {
       //show columns command will return these data
-      Type columnType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
-      ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(),
-          columnType);
+      Type columnType = CarbondataType2SpiMapper(col.getColumnSchema());
+      ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(), columnType);
       columnsMetaList.add(columnMeta);
     }
 
@@ -162,21 +161,21 @@ public class CarbondataMetadata implements ConnectorMetadata {
       column.getNumberOfChild();
       column.getListOfChildDimensions();
 
-      Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+      Type spiType = CarbondataType2SpiMapper(cs);
       columnHandles.put(cs.getColumnName(),
           new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, column.getSchemaOrdinal(),
               column.getKeyOrdinal(), column.getColumnGroupOrdinal(), false, cs.getColumnGroupId(),
-              cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+              cs.getColumnUniqueId(), cs.isUseInvertedIndex(), cs.getPrecision(), cs.getScale()));
     }
 
     for (CarbonMeasure measure : cb.getMeasureByTableName(tableName)) {
       ColumnSchema cs = measure.getColumnSchema();
 
-      Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+      Type spiType = CarbondataType2SpiMapper(cs);
       columnHandles.put(cs.getColumnName(),
           new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, cs.getSchemaOrdinal(),
               measure.getOrdinal(), cs.getColumnGroupId(), true, cs.getColumnGroupId(),
-              cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+              cs.getColumnUniqueId(), cs.isUseInvertedIndex(), cs.getPrecision(), cs.getScale()));
     }
 
     //should i cache it?
@@ -230,7 +229,8 @@ public class CarbondataMetadata implements ConnectorMetadata {
     return getTableMetadata(carbondataTableHandle.getSchemaTableName());
   }
 
-  public static Type CarbondataType2SpiMapper(DataType colType) {
+  public static Type CarbondataType2SpiMapper(ColumnSchema columnSchema) {
+    DataType colType = columnSchema.getDataType();
     switch (colType) {
       case BOOLEAN:
         return BooleanType.BOOLEAN;
@@ -243,9 +243,12 @@ public class CarbondataMetadata implements ConnectorMetadata {
       case FLOAT:
       case DOUBLE:
         return DoubleType.DOUBLE;
-
       case DECIMAL:
-        return DecimalType.createDecimalType();
+        if(columnSchema.getPrecision() > 0){
+          return DecimalType.createDecimalType(columnSchema.getPrecision(), columnSchema.getScale());
+        } else {
+          return DecimalType.createDecimalType();
+        }
       case STRING:
         return VarcharType.VARCHAR;
       case DATE:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9669c0b2/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
new file mode 100755
index 0000000..1d8b2b2
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+import org.apache.carbondata.presto.impl.CarbonTableReader;
+
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.type.TypeManager;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+import javax.inject.Inject;
+
+import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataModule implements Module {
+
+  private final String connectorId;
+  private final TypeManager typeManager;
+
+  public CarbondataModule(String connectorId, TypeManager typeManager) {
+    this.connectorId = requireNonNull(connectorId, "connector id is null");
+    this.typeManager = requireNonNull(typeManager, "typeManager is null");
+  }
+
+  @Override public void configure(Binder binder) {
+    binder.bind(TypeManager.class).toInstance(typeManager);
+
+    binder.bind(CarbondataConnectorId.class).toInstance(new CarbondataConnectorId(connectorId));
+    binder.bind(CarbondataMetadata.class).in(Scopes.SINGLETON);
+    binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
+    binder.bind(ConnectorSplitManager.class).to(CarbondataSplitManager.class).in(Scopes.SINGLETON);
+    binder.bind(ConnectorRecordSetProvider.class).to(CarbondataRecordSetProvider.class)
+        .in(Scopes.SINGLETON);
+    binder.bind(ConnectorPageSourceProvider.class).to(CarbondataPageSourceProvider.class)
+        .in(Scopes.SINGLETON);
+    binder.bind(CarbondataHandleResolver.class).in(Scopes.SINGLETON);
+    binder.bind(CarbondataRecordSetProvider.class).in(Scopes.SINGLETON);
+    configBinder(binder).bindConfig(CarbonTableConfig.class);
+  }
+
+  public static final class TypeDeserializer extends FromStringDeserializer<Type> {
+    private final TypeManager typeManager;
+
+    @Inject public TypeDeserializer(TypeManager typeManager) {
+      super(Type.class);
+      this.typeManager = requireNonNull(typeManager, "typeManager is null");
+    }
+
+    @Override protected Type _deserialize(String value, DeserializationContext context) {
+      Type type = typeManager.getType(parseTypeSignature(value));
+      checkArgument(type != null, "Unknown type %s", value);
+      return type;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9669c0b2/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
new file mode 100644
index 0000000..7c50c66
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.Page;
+import com.facebook.presto.spi.PageBuilder;
+import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.type.DecimalType;
+import com.facebook.presto.spi.type.Type;
+import io.airlift.slice.Slice;
+
+import static com.facebook.presto.spi.type.Decimals.encodeUnscaledValue;
+import static com.facebook.presto.spi.type.Decimals.isShortDecimal;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.math.RoundingMode.HALF_UP;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Carbondata Page Source class for custom Carbondata RecordSet Iteration.
+ */
+public class CarbondataPageSource implements ConnectorPageSource {
+
+  private static final int ROWS_PER_REQUEST = 4096;
+  private final RecordCursor cursor;
+  private final List<Type> types;
+  private final PageBuilder pageBuilder;
+  private boolean closed;
+  private final char[] buffer = new char[100];
+
+  public CarbondataPageSource(RecordSet recordSet)
+  {
+    this(requireNonNull(recordSet, "recordSet is null").getColumnTypes(), recordSet.cursor());
+  }
+
+  public CarbondataPageSource(List<Type> types, RecordCursor cursor)
+  {
+    this.cursor = requireNonNull(cursor, "cursor is null");
+    this.types = unmodifiableList(new ArrayList<>(requireNonNull(types, "types is null")));
+    this.pageBuilder = new PageBuilder(this.types);
+  }
+
+  public RecordCursor getCursor()
+  {
+    return cursor;
+  }
+
+  @Override public long getTotalBytes() {
+    return cursor.getTotalBytes();
+  }
+
+  @Override public long getCompletedBytes() {
+    return cursor.getCompletedBytes();
+  }
+
+  @Override public long getReadTimeNanos() {
+    return cursor.getReadTimeNanos();
+  }
+
+  @Override public boolean isFinished() {
+    return closed && pageBuilder.isEmpty();
+  }
+
+  @Override public Page getNextPage() {
+    if (!closed) {
+      int i;
+      for (i = 0; i < ROWS_PER_REQUEST; i++) {
+        if (pageBuilder.isFull()) {
+          break;
+        }
+        if (!cursor.advanceNextPosition()) {
+          closed = true;
+          break;
+        }
+
+        pageBuilder.declarePosition();
+        for (int column = 0; column < types.size(); column++) {
+          BlockBuilder output = pageBuilder.getBlockBuilder(column);
+          if (cursor.isNull(column)) {
+            output.appendNull();
+          } else {
+            Type type = types.get(column);
+            Class<?> javaType = type.getJavaType();
+            if (javaType == boolean.class) {
+              type.writeBoolean(output, cursor.getBoolean(column));
+            } else if (javaType == long.class) {
+              type.writeLong(output, cursor.getLong(column));
+            } else if (javaType == double.class) {
+              type.writeDouble(output, cursor.getDouble(column));
+            } else if (javaType == Slice.class) {
+              Slice slice = cursor.getSlice(column);
+              if(type instanceof  DecimalType)
+              {
+                if (isShortDecimal(type)) {
+                  type.writeLong(output, parseLong((DecimalType) type, slice, 0, slice.length()));
+                } else {
+                  type.writeSlice(output, parseSlice((DecimalType) type, slice, 0, slice.length()));
+                }
+              } else {
+                type.writeSlice(output, slice, 0, slice.length());
+              }
+            } else {
+              type.writeObject(output, cursor.getObject(column));
+            }
+          }
+        }
+      }
+    }
+
+    // only return a page if the buffer is full or we are finishing
+    if (pageBuilder.isEmpty() || (!closed && !pageBuilder.isFull())) {
+      return null;
+    }
+    Page page = pageBuilder.build();
+    pageBuilder.reset();
+    return page;
+ }
+
+  @Override public long getSystemMemoryUsage() {
+    return cursor.getSystemMemoryUsage() + pageBuilder.getSizeInBytes();
+  }
+
+  @Override public void close() throws IOException {
+    closed = true;
+    cursor.close();
+
+  }
+
+  private long parseLong(DecimalType type, Slice slice, int offset, int length)
+  {
+    BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
+    return decimal.unscaledValue().longValue();
+  }
+
+
+  private Slice parseSlice(DecimalType type, Slice slice, int offset, int length)
+  {
+    BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
+    return encodeUnscaledValue(decimal.unscaledValue());
+  }
+
+  private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int offset, int length)
+  {
+    checkArgument(length < buffer.length);
+    for (int i = 0; i < length; i++) {
+      buffer[i] = (char) slice.getByte(offset + i);
+    }
+    BigDecimal decimal = new BigDecimal(buffer, 0, length);
+    checkState(decimal.scale() <= type.getScale(), "Read decimal value scale larger than column scale");
+    decimal = decimal.setScale(type.getScale(), HALF_UP);
+    checkState(decimal.precision() <= type.getPrecision(), "Read decimal precision larger than column precision");
+    return decimal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9669c0b2/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
new file mode 100644
index 0000000..46d8611
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.List;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.inject.Inject;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Provider Class for Carbondata Page Source class.
+ */
+public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider {
+
+  private CarbondataRecordSetProvider carbondataRecordSetProvider;
+
+  @Inject
+  public CarbondataPageSourceProvider(CarbondataRecordSetProvider carbondataRecordSetProvider)
+  {
+    this.carbondataRecordSetProvider = requireNonNull(carbondataRecordSetProvider, "recordSetProvider is null");
+  }
+
+  @Override
+  public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle,
+      ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) {
+    return new CarbondataPageSource(carbondataRecordSetProvider.getRecordSet(transactionHandle, session, split, columns));
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9669c0b2/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
index ad47f75..2e97dc0 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
@@ -18,7 +18,14 @@
 package org.apache.carbondata.presto;
 
 import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.type.DecimalType;
+import com.facebook.presto.spi.type.Decimals;
+import com.facebook.presto.spi.type.TimestampType;
 import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.block.BlockBuilderStatus;
+
 import com.google.common.base.Strings;
 import io.airlift.log.Logger;
 import io.airlift.slice.Slice;
@@ -26,16 +33,22 @@ import io.airlift.slice.Slices;
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+
 
 import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
+import static com.facebook.presto.spi.type.Decimals.isShortDecimal;
+import static com.facebook.presto.spi.type.Decimals.rescale;
 import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
 import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import static io.airlift.slice.Slices.utf8Slice;
 
 public class CarbondataRecordCursor implements RecordCursor {
 
@@ -114,8 +127,10 @@ public class CarbondataRecordCursor implements RecordCursor {
 
   @Override public long getLong(int field) {
     String timeStr = getFieldValue(field);
-    Long milliSec = 0L;
-
+    Type actual = getType(field);
+    if(actual instanceof TimestampType){
+      return new Timestamp(Long.parseLong(timeStr)).getTime()/1000;
+    }
     //suppose the
     return Math.round(Double.parseDouble(getFieldValue(field)));
   }
@@ -126,8 +141,41 @@ public class CarbondataRecordCursor implements RecordCursor {
   }
 
   @Override public Slice getSlice(int field) {
-    checkFieldType(field, VARCHAR);
-    return Slices.utf8Slice(getFieldValue(field));
+    Type decimalType = getType(field);
+    if (decimalType instanceof DecimalType) {
+      DecimalType actual = (DecimalType) decimalType;
+      CarbondataColumnHandle carbondataColumnHandle = columnHandles.get(field);
+      if(carbondataColumnHandle.getPrecision() > 0 ) {
+        checkFieldType(field, DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(), carbondataColumnHandle.getScale()));
+      } else {
+        checkFieldType(field, DecimalType.createDecimalType());
+      }
+      String fieldValue = getFieldValue(field);
+      BigDecimal bigDecimalValue = new BigDecimal(fieldValue);
+      if (isShortDecimal(decimalType)) {
+        return utf8Slice(Decimals.toString(bigDecimalValue.longValue(), actual.getScale()));
+      } else {
+        if (bigDecimalValue.scale() > actual.getScale()) {
+          BigInteger unscaledDecimal =
+              rescale(bigDecimalValue.unscaledValue(), bigDecimalValue.scale(),
+                  bigDecimalValue.scale());
+          Slice decimalSlice = Decimals.encodeUnscaledValue(unscaledDecimal);
+          return utf8Slice(Decimals.toString(decimalSlice, actual.getScale()));
+          //return decimalSlice;
+        } else {
+          BigInteger unscaledDecimal =
+              rescale(bigDecimalValue.unscaledValue(), bigDecimalValue.scale(), actual.getScale());
+          Slice decimalSlice = Decimals.encodeUnscaledValue(unscaledDecimal);
+          return utf8Slice(Decimals.toString(decimalSlice, actual.getScale()));
+          //return decimalSlice;
+
+        }
+
+      }
+    } else {
+      checkFieldType(field, VARCHAR);
+      return utf8Slice(getFieldValue(field));
+    }
   }
 
   @Override public Object getObject(int field) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9669c0b2/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
index d3fd7a0..7bf0e84 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
@@ -99,12 +99,10 @@ public class CarbondataRecordSet implements RecordSet {
       RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns, split);
       return rc;
     } catch (QueryExecutionException e) {
-      //throw new InterruptedException(e.getMessage());
-      System.out.println(e.getMessage());
-    } catch (Exception ex) {
-      System.out.println(ex.toString());
+       throw new RuntimeException(e.getMessage(), e);
+   } catch (Exception ex) {
+      throw new RuntimeException(ex.getMessage(), ex);
     }
-    return null;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9669c0b2/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index f0958c7..a9652cc 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -129,7 +129,7 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
       CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
       Type type = cdch.getColumnType();
 
-      DataType coltype = Spi2CarbondataTypeMapper(type);
+      DataType coltype = Spi2CarbondataTypeMapper(cdch);
       Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
 
       domain = originalConstraint.getDomains().get().get(c);
@@ -200,6 +200,10 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
         if (coltype.equals(DataType.STRING)) {
           ex = new EqualToExpression(colExpression,
               new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+        } else if (coltype.equals(DataType.TIMESTAMP) || coltype.equals(DataType.DATE)) {
+          Long value = (Long) singleValues.get(0) * 1000;
+          ex = new EqualToExpression(colExpression,
+              new LiteralExpression(value , coltype));
         } else ex = new EqualToExpression(colExpression,
             new LiteralExpression(singleValues.get(0), coltype));
         filters.add(ex);
@@ -241,16 +245,18 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
         CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier()));
   }
 
-  public static DataType Spi2CarbondataTypeMapper(Type colType) {
+  public static DataType Spi2CarbondataTypeMapper(CarbondataColumnHandle carbondataColumnHandle) {
+    Type colType = carbondataColumnHandle.getColumnType();
     if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
     else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
     else if (colType == IntegerType.INTEGER) return DataType.INT;
     else if (colType == BigintType.BIGINT) return DataType.LONG;
     else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
-    else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
     else if (colType == VarcharType.VARCHAR) return DataType.STRING;
     else if (colType == DateType.DATE) return DataType.DATE;
     else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+    else if (colType == DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(),
+        carbondataColumnHandle.getScale())) return DataType.DECIMAL;
     else return DataType.STRING;
   }
 


[4/5] carbondata git commit: refactor integration/presto

Posted by ch...@apache.org.
refactor integration/presto


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7c0e6606
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7c0e6606
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7c0e6606

Branch: refs/heads/master
Commit: 7c0e6606ac3b91817998d71f184bfc30e6ea3d84
Parents: 4422c52
Author: chenliang613 <ch...@huawei.com>
Authored: Sat Apr 8 15:07:47 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Thu May 25 11:30:21 2017 +0800

----------------------------------------------------------------------
 .../presto/CarbondataConnectorFactory.java      | 12 ------
 .../carbondata/presto/CarbondataMetadata.java   | 34 ++++++++---------
 .../presto/CarbondataRecordSetProvider.java     | 12 +++---
 .../presto/impl/CarbonTableReader.java          | 40 +++++++++-----------
 4 files changed, 37 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7c0e6606/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index 66c007d..d1c8082 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -67,25 +67,13 @@ public class CarbondataConnectorFactory implements ConnectorFactory {
 
       LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
       CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
-      //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
       ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
       ConnectorRecordSetProvider connectorRecordSet =
           injector.getInstance(ConnectorRecordSetProvider.class);
-      //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class);
-
-      //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
-      //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);
-      //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
-      //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
-      //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
 
       return new CarbondataConnector(lifeCycleManager, metadata,
           new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
-          //new ClassLoaderSafeConnectorRecordSetProvider(, classLoader),
           classLoader
-          //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
-          //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
-          //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
       );
     } catch (Exception e) {
       throw Throwables.propagate(e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7c0e6606/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index d938a3d..f2d594a 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -59,11 +59,11 @@ public class CarbondataMetadata implements ConnectorMetadata {
   }
 
   public List<String> listSchemaNamesInternal() {
-    List<String> ret;
+    List<String> schemaNameList;
     try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
-      ret = carbonTableReader.getSchemaNames();
+      schemaNameList = carbonTableReader.getSchemaNames();
     }
-    return ret;
+    return schemaNameList;
   }
 
   @Override
@@ -109,27 +109,28 @@ public class CarbondataMetadata implements ConnectorMetadata {
     return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
   }
 
-  private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) {
-    if (!listSchemaNamesInternal().contains(tableName.getSchemaName())) {
+  private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) {
+    if (!listSchemaNamesInternal().contains(schemaTableName.getSchemaName())) {
       return null;
     }
 
-    CarbonTable cb = carbonTableReader.getTable(tableName);
-    if (cb == null) {
+    CarbonTable carbonTable = carbonTableReader.getTable(schemaTableName);
+    if (carbonTable == null) {
       return null;
     }
 
-    List<ColumnMetadata> spiCols = new LinkedList<>();
-    List<CarbonColumn> carbonColumns = cb.getCreateOrderColumn(tableName.getTableName());
+    List<ColumnMetadata> columnsMetaList = new LinkedList<>();
+    List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(schemaTableName.getTableName());
     for (CarbonColumn col : carbonColumns) {
       //show columns command will return these data
-      Type spiType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
-      ColumnMetadata spiCol = new ColumnMetadata(col.getColumnSchema().getColumnName(), spiType);
-      spiCols.add(spiCol);
+      Type columnType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
+      ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(),
+          columnType);
+      columnsMetaList.add(columnMeta);
     }
 
     //carbondata connector's table metadata
-    return new ConnectorTableMetadata(tableName, spiCols);
+    return new ConnectorTableMetadata(schemaTableName, columnsMetaList);
   }
 
   @Override public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session,
@@ -141,6 +142,7 @@ public class CarbondataMetadata implements ConnectorMetadata {
         "tableHandle is not for this connector");
 
     String schemaName = handle.getSchemaTableName().getSchemaName();
+
     if (!listSchemaNamesInternal().contains(schemaName)) {
       throw new SchemaNotFoundException(schemaName);
     }
@@ -250,12 +252,6 @@ public class CarbondataMetadata implements ConnectorMetadata {
         return DateType.DATE;
       case TIMESTAMP:
         return TimestampType.TIMESTAMP;
-
-            /*case DataType.MAP:
-            case DataType.ARRAY:
-            case DataType.STRUCT:
-            case DataType.NULL:*/
-
       default:
         return VarcharType.VARCHAR;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7c0e6606/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index 8b087df..f0958c7 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -61,8 +61,6 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
 
   @Inject
   public CarbondataRecordSetProvider(CarbondataConnectorId connectorId, CarbonTableReader reader) {
-    //this.config = requireNonNull(config, "config is null");
-    //this.connector = requireNonNull(connector, "connector is null");
     this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
     this.carbonTableReader = reader;
   }
@@ -72,9 +70,9 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
     requireNonNull(split, "split is null");
     requireNonNull(columns, "columns is null");
 
-    CarbondataSplit cdSplit =
+    CarbondataSplit carbondataSplit =
         checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
-    checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
+    checkArgument(carbondataSplit.getConnectorId().equals(connectorId), "split is not for this connector");
 
     String targetCols = "";
     // Convert all columns handles
@@ -95,7 +93,7 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
     //String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
 
     CarbonTableCacheModel tableCacheModel =
-        carbonTableReader.getCarbonCache(cdSplit.getSchemaTableName());
+        carbonTableReader.getCarbonCache(carbondataSplit.getSchemaTableName());
     checkNotNull(tableCacheModel, "tableCacheModel should not be null");
     checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
     checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null");
@@ -107,10 +105,10 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
         QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable);
 
     // Push down filter
-    fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
+    fillFilter2QueryModel(queryModel, carbondataSplit.getConstraints(), targetTable);
 
     // Return new record set
-    return new CarbondataRecordSet(targetTable, session, cdSplit,
+    return new CarbondataRecordSet(targetTable, session, carbondataSplit,
         handles.build(), queryModel);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7c0e6606/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index bb482b0..b6e45d6 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -52,18 +52,13 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CacheClient;
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.thrift.TBase;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URI;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -82,7 +77,7 @@ public class CarbonTableReader {
 
   private CarbonTableConfig config;
   private List<SchemaTableName> tableList;
-  private CarbonFile dbStore;
+  private CarbonFile carbonFileList;
   private FileFactory.FileType fileType;
 
   // A cache for Carbon reader
@@ -98,10 +93,10 @@ public class CarbonTableReader {
     if (!cc.containsKey(table)) {
       try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
           FileFactory.class.getClassLoader())) {
-        if (dbStore == null) {
+        if (carbonFileList == null) {
           fileType = FileFactory.getFileType(config.getStorePath());
           try {
-            dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+            carbonFileList = FileFactory.getCarbonFile(config.getStorePath(), fileType);
           } catch (Exception ex) {
             throw new RuntimeException(ex);
           }
@@ -126,11 +121,11 @@ public class CarbonTableReader {
     }
   };
 
-  public boolean updateDbStore() {
-    if (dbStore == null) {
+  public boolean updateCarbonFile() {
+    if (carbonFileList == null) {
       fileType = FileFactory.getFileType(config.getStorePath());
       try {
-        dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+        carbonFileList = FileFactory.getCarbonFile(config.getStorePath(), fileType);
       } catch (Exception ex) {
         throw new RuntimeException(ex);
       }
@@ -139,12 +134,12 @@ public class CarbonTableReader {
   }
 
   public List<String> updateSchemaList() {
-    updateDbStore();
+    updateCarbonFile();
 
-    if (dbStore != null) {
-      List<String> scs =
-          Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
-      return scs;
+    if (carbonFileList != null) {
+      List<String> schemaList =
+          Stream.of(carbonFileList.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
+      return schemaList;
     } else return ImmutableList.of();
   }
 
@@ -154,7 +149,7 @@ public class CarbonTableReader {
   }
 
   public Set<String> updateTableList(String dbName) {
-    List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName()))
+    List<CarbonFile> schema = Stream.of(carbonFileList.listFiles()).filter(a -> dbName.equals(a.getName()))
         .collect(Collectors.toList());
     if (schema.size() > 0) {
       return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName())
@@ -177,15 +172,14 @@ public class CarbonTableReader {
 
   public void updateSchemaTables() {
     // update logic determine later
-    if (dbStore == null) {
+    if (carbonFileList == null) {
       updateSchemaList();
     }
-
     tableList = new LinkedList<>();
-    for (CarbonFile db : dbStore.listFiles()) {
-      if (!db.getName().endsWith(".mdt")) {
-        for (CarbonFile table : db.listFiles()) {
-          tableList.add(new SchemaTableName(db.getName(), table.getName()));
+    for (CarbonFile cf : carbonFileList.listFiles()) {
+      if (!cf.getName().endsWith(".mdt")) {
+        for (CarbonFile table : cf.listFiles()) {
+          tableList.add(new SchemaTableName(cf.getName(), table.getName()));
         }
       }
     }