You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/05/24 10:49:04 UTC

[1/5] carbondata git commit: resolved rebase conflict for presto branch

Repository: carbondata
Updated Branches:
  refs/heads/master b10cf4f1c -> 5f6f1a5c4


resolved rebase conflict for presto branch


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a60775ea
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a60775ea
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a60775ea

Branch: refs/heads/master
Commit: a60775ea9b5949f46c620cd90443c33fcaf021ee
Parents: b10cf4f
Author: chenliang613 <ch...@apache.org>
Authored: Wed May 24 18:34:32 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Wed May 24 18:34:32 2017 +0800

----------------------------------------------------------------------
 .../presto/CarbondataColumnHandle.java          | 141 ++++
 .../carbondata/presto/CarbondataMetadata.java   | 271 +++++++
 .../presto/impl/CarbonLocalInputSplit.java      |  83 +++
 .../presto/impl/CarbonTableConfig.java          |  57 ++
 .../presto/impl/CarbonTableReader.java          | 723 +++++++++++++++++++
 5 files changed, 1275 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a60775ea/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
new file mode 100755
index 0000000..cc10165
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.type.Type;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+//import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Objects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataColumnHandle implements ColumnHandle {
+  private final String connectorId;
+  private final String columnName;
+
+  public boolean isInvertedIndex() {
+    return isInvertedIndex;
+  }
+
+  private final Type columnType;
+  private final int ordinalPosition;
+  private final int keyOrdinal;
+  private final int columnGroupOrdinal;
+
+  private final int columnGroupId;
+  private final String columnUniqueId;
+  private final boolean isInvertedIndex;
+
+  public boolean isMeasure() {
+    return isMeasure;
+  }
+
+  private final boolean isMeasure;
+
+  public int getKeyOrdinal() {
+    return keyOrdinal;
+  }
+
+  public int getColumnGroupOrdinal() {
+    return columnGroupOrdinal;
+  }
+
+  public int getColumnGroupId() {
+    return columnGroupId;
+  }
+
+  public String getColumnUniqueId() {
+    return columnUniqueId;
+  }
+
+  /**
+   * ordinalPosition of a columnhandle is the -> number of the column in the entire list of columns of this table
+   * IT DOESNT DEPEND ON THE QUERY (select clm3, clm0, clm1  from tablename)
+   * The columnhandle of clm3 : has ordinalposition = 3
+   */
+
+  @JsonCreator public CarbondataColumnHandle(@JsonProperty("connectorId") String connectorId,
+      @JsonProperty("columnName") String columnName, @JsonProperty("columnType") Type columnType,
+      @JsonProperty("ordinalPosition") int ordinalPosition,
+      @JsonProperty("keyOrdinal") int keyOrdinal,
+      @JsonProperty("columnGroupOrdinal") int columnGroupOrdinal,
+      @JsonProperty("isMeasure") boolean isMeasure,
+      @JsonProperty("columnGroupId") int columnGroupId,
+      @JsonProperty("columnUniqueId") String columnUniqueId,
+      @JsonProperty("isInvertedIndex") boolean isInvertedIndex) {
+    this.connectorId = requireNonNull(connectorId, "connectorId is null");
+    this.columnName = requireNonNull(columnName, "columnName is null");
+    this.columnType = requireNonNull(columnType, "columnType is null");
+
+    this.ordinalPosition = requireNonNull(ordinalPosition, "ordinalPosition is null");
+    this.keyOrdinal = requireNonNull(keyOrdinal, "keyOrdinal is null");
+    this.columnGroupOrdinal = requireNonNull(columnGroupOrdinal, "columnGroupOrdinal is null");
+
+    this.isMeasure = isMeasure;
+    this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null");
+    this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
+    this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
+  }
+
+  @JsonProperty public String getConnectorId() {
+    return connectorId;
+  }
+
+  @JsonProperty public String getColumnName() {
+    return columnName;
+  }
+
+  @JsonProperty public Type getColumnType() {
+    return columnType;
+  }
+
+  @JsonProperty public int getOrdinalPosition() {
+    return ordinalPosition;
+  }
+
+  public ColumnMetadata getColumnMetadata() {
+    return new ColumnMetadata(columnName, columnType, null, false);
+  }
+
+  @Override public int hashCode() {
+    return Objects.hash(connectorId, columnName);
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if ((obj == null) || (getClass() != obj.getClass())) {
+      return false;
+    }
+
+    CarbondataColumnHandle other = (CarbondataColumnHandle) obj;
+    return Objects.equals(this.connectorId, other.connectorId) && Objects
+        .equals(this.columnName, other.columnName);
+  }
+
+  @Override public String toString() {
+    return toStringHelper(this).add("connectorId", connectorId).add("columnName", columnName)
+        .add("columnType", columnType).add("ordinalPosition", ordinalPosition).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a60775ea/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
new file mode 100755
index 0000000..d2c5ab6
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.presto.impl.CarbonTableReader;
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.type.*;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+
+import javax.inject.Inject;
+import java.util.*;
+
+import static org.apache.carbondata.presto.Types.checkType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataMetadata implements ConnectorMetadata {
+  private final String connectorId;
+  private CarbonTableReader carbonTableReader;
+  private ClassLoader classLoader;
+
+  private Map<String, ColumnHandle> columnHandleMap;
+
+  @Inject public CarbondataMetadata(CarbondataConnectorId connectorId, CarbonTableReader reader) {
+    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+    this.carbonTableReader = requireNonNull(reader, "client is null");
+  }
+
+  public void putClassLoader(ClassLoader classLoader) {
+    this.classLoader = classLoader;
+  }
+
+  @Override public List<String> listSchemaNames(ConnectorSession session) {
+    return listSchemaNamesInternal();
+  }
+
+  public List<String> listSchemaNamesInternal() {
+    List<String> ret;
+    try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+      ret = carbonTableReader.getSchemaNames();
+    }
+    return ret;
+  }
+
+  @Override
+  public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) {
+
+        /*List<SchemaTableName> all = carbonTableReader.getTableList();
+        if(schemaNameOrNull != null)
+        {
+            return all.stream().filter(a -> schemaNameOrNull.equals(a.getSchemaName())).collect(Collectors.toList());
+        }
+        return all;*/
+
+    List<String> schemaNames;
+    if (schemaNameOrNull != null) {
+      schemaNames = ImmutableList.of(schemaNameOrNull);
+    } else {
+      schemaNames = carbonTableReader.getSchemaNames();
+    }
+
+    ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
+    for (String schemaName : schemaNames) {
+      for (String tableName : carbonTableReader.getTableNames(schemaName)) {
+        builder.add(new SchemaTableName(schemaName, tableName));
+      }
+    }
+    return builder.build();
+  }
+
+  @Override
+  public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session,
+      SchemaTablePrefix prefix) {
+    requireNonNull(prefix, "SchemaTablePrefix is null");
+
+    ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
+    for (SchemaTableName tableName : listTables(session, prefix)) {
+      ConnectorTableMetadata tableMetadata = getTableMetadata(tableName);
+      if (tableMetadata != null) {
+        columns.put(tableName, tableMetadata.getColumns());
+      }
+    }
+    return columns.build();
+  }
+
+  //if prefix is null. return all tables
+  //if prefix is not null, just return this table
+  private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix) {
+    if (prefix.getSchemaName() == null) {
+      return listTables(session, prefix.getSchemaName());
+    }
+    return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
+  }
+
+  private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) {
+    if (!listSchemaNamesInternal().contains(tableName.getSchemaName())) {
+      return null;
+    }
+
+    CarbonTable cb = carbonTableReader.getTable(tableName);
+    if (cb == null) {
+      return null;
+    }
+
+    List<ColumnMetadata> spiCols = new LinkedList<>();
+    List<CarbonColumn> carbonColumns = cb.getCreateOrderColumn(tableName.getTableName());
+    for (CarbonColumn col : carbonColumns) {
+      //show columns command will return these data
+      Type spiType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
+      ColumnMetadata spiCol = new ColumnMetadata(col.getColumnSchema().getColumnName(), spiType);
+      spiCols.add(spiCol);
+    }
+
+    //carbondata connector's table metadata
+    return new ConnectorTableMetadata(tableName, spiCols);
+  }
+
+  @Override public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session,
+      ConnectorTableHandle tableHandle) {
+
+    CarbondataTableHandle handle =
+        checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
+    checkArgument(handle.getConnectorId().equals(connectorId),
+        "tableHandle is not for this connector");
+
+    String schemaName = handle.getSchemaTableName().getSchemaName();
+    if (!listSchemaNamesInternal().contains(schemaName)) {
+      throw new SchemaNotFoundException(schemaName);
+    }
+
+    //CarbonTable(official struct) is stored in CarbonMetadata(official struct)
+    CarbonTable cb = carbonTableReader.getTable(handle.getSchemaTableName());
+    if (cb == null) {
+      throw new TableNotFoundException(handle.getSchemaTableName());
+    }
+
+    ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
+    String tableName = handle.getSchemaTableName().getTableName();
+    for (CarbonDimension column : cb.getDimensionByTableName(tableName)) {
+      ColumnSchema cs = column.getColumnSchema();
+
+      int complex = column.getComplexTypeOrdinal();
+      column.getNumberOfChild();
+      column.getListOfChildDimensions();
+
+      Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+      columnHandles.put(cs.getColumnName(),
+          new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, column.getSchemaOrdinal(),
+              column.getKeyOrdinal(), column.getColumnGroupOrdinal(), false, cs.getColumnGroupId(),
+              cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+    }
+
+    for (CarbonMeasure measure : cb.getMeasureByTableName(tableName)) {
+      ColumnSchema cs = measure.getColumnSchema();
+
+      Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+      columnHandles.put(cs.getColumnName(),
+          new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, cs.getSchemaOrdinal(),
+              measure.getOrdinal(), cs.getColumnGroupId(), true, cs.getColumnGroupId(),
+              cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+    }
+
+    //should i cache it?
+    columnHandleMap = columnHandles.build();
+
+    return columnHandleMap;
+  }
+
+  @Override public ColumnMetadata getColumnMetadata(ConnectorSession session,
+      ConnectorTableHandle tableHandle, ColumnHandle columnHandle) {
+
+    checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
+    return checkType(columnHandle, CarbondataColumnHandle.class, "columnHandle")
+        .getColumnMetadata();
+  }
+
+  @Override
+  public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
+    //check tablename is valid
+    //schema is exist
+    //tables is exist
+
+    //CarbondataTable  get from jar
+    return new CarbondataTableHandle(connectorId, tableName);
+  }
+
+  @Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session,
+      ConnectorTableHandle table, Constraint<ColumnHandle> constraint,
+      Optional<Set<ColumnHandle>> desiredColumns) {
+    CarbondataTableHandle handle = checkType(table, CarbondataTableHandle.class, "table");
+    ConnectorTableLayout layout = new ConnectorTableLayout(
+        new CarbondataTableLayoutHandle(handle, constraint.getSummary()/*, constraint.getPredicateMap(),constraint.getFilterTuples()*/));
+    return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
+  }
+
+  @Override public ConnectorTableLayout getTableLayout(ConnectorSession session,
+      ConnectorTableLayoutHandle handle) {
+    return new ConnectorTableLayout(handle);
+  }
+
+  @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session,
+      ConnectorTableHandle table) {
+    return getTableMetadataInternal(table);
+  }
+
+  public ConnectorTableMetadata getTableMetadataInternal(ConnectorTableHandle table) {
+    CarbondataTableHandle carbondataTableHandle =
+        checkType(table, CarbondataTableHandle.class, "table");
+    checkArgument(carbondataTableHandle.getConnectorId().equals(connectorId),
+        "tableHandle is not for this connector");
+    return getTableMetadata(carbondataTableHandle.getSchemaTableName());
+  }
+
+  public static Type CarbondataType2SpiMapper(DataType colType) {
+    switch (colType) {
+      case BOOLEAN:
+        return BooleanType.BOOLEAN;
+      case SHORT:
+        return SmallintType.SMALLINT;
+      case INT:
+        return IntegerType.INTEGER;
+      case LONG:
+        return BigintType.BIGINT;
+      case FLOAT:
+      case DOUBLE:
+        return DoubleType.DOUBLE;
+
+      case DECIMAL:
+        return DecimalType.createDecimalType();
+      case STRING:
+        return VarcharType.VARCHAR;
+      case DATE:
+        return DateType.DATE;
+      case TIMESTAMP:
+        return TimestampType.TIMESTAMP;
+
+            /*case DataType.MAP:
+            case DataType.ARRAY:
+            case DataType.STRUCT:
+            case DataType.NULL:*/
+
+      default:
+        return VarcharType.VARCHAR;
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a60775ea/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
new file mode 100755
index 0000000..ba8d9b5
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class CarbonLocalInputSplit {
+
+  private static final long serialVersionUID = 3520344046772190207L;
+  private String segmentId;
+  private String path;
+  private long start;
+  private long length;
+  private List<String> locations;
+  private short version;
+
+  /**
+   * Number of BlockLets in a block
+   */
+  private int numberOfBlocklets = 0;
+
+  @JsonProperty public short getVersion() {
+    return version;
+  }
+
+  @JsonProperty public List<String> getLocations() {
+    return locations;
+  }
+
+  @JsonProperty public long getLength() {
+    return length;
+  }
+
+  @JsonProperty public long getStart() {
+    return start;
+  }
+
+  @JsonProperty public String getPath() {
+    return path;
+  }
+
+  @JsonProperty public String getSegmentId() {
+    return segmentId;
+  }
+
+  @JsonProperty public int getNumberOfBlocklets() {
+    return numberOfBlocklets;
+  }
+
+  @JsonCreator public CarbonLocalInputSplit(@JsonProperty("segmentId") String segmentId,
+      @JsonProperty("path") String path, @JsonProperty("start") long start,
+      @JsonProperty("length") long length, @JsonProperty("locations") List<String> locations,
+      @JsonProperty("numberOfBlocklets") int numberOfBlocklets/*,
+                                 @JsonProperty("tableBlockInfo") TableBlockInfo tableBlockInfo*/,
+      @JsonProperty("version") short version) {
+    this.path = path;
+    this.start = start;
+    this.length = length;
+    this.segmentId = segmentId;
+    this.locations = locations;
+    this.numberOfBlocklets = numberOfBlocklets;
+    //this.tableBlockInfo = tableBlockInfo;
+    this.version = version;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a60775ea/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
new file mode 100755
index 0000000..a0ef63f
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import io.airlift.configuration.Config;
+
+import javax.validation.constraints.NotNull;
+
+public class CarbonTableConfig {
+
+  //read from config
+  private String dbPath;
+  private String tablePath;
+  private String storePath;
+
+  @NotNull public String getDbPath() {
+    return dbPath;
+  }
+
+  @Config("carbondata-store") public CarbonTableConfig setDbPath(String dbPath) {
+    this.dbPath = dbPath;
+    return this;
+  }
+
+  @NotNull public String getTablePath() {
+    return tablePath;
+  }
+
+  @Config("carbondata-store") public CarbonTableConfig setTablePath(String tablePath) {
+    this.tablePath = tablePath;
+    return this;
+  }
+
+  @NotNull public String getStorePath() {
+    return storePath;
+  }
+
+  @Config("carbondata-store") public CarbonTableConfig setStorePath(String storePath) {
+    this.storePath = storePath;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a60775ea/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
new file mode 100755
index 0000000..14ecfbc
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.*;
+import org.apache.carbondata.core.datastore.block.*;
+import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
+import org.apache.carbondata.core.datastore.impl.btree.BlockBTreeLeafNode;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.service.impl.PathFactory;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CacheClient;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.thrift.TBase;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbonTableReader {
+
+  /** CarbonTableReader will be a facade of these utils
+   *
+   * 1:CarbonMetadata,(logic table)
+   * 2:FileFactory, (physic table file)
+   * 3:CarbonCommonFactory, (offer some )
+   * 4:DictionaryFactory, (parse dictionary util)
+   */
+
+  private CarbonTableConfig config;
+  private List<SchemaTableName> tableList;
+  private CarbonFile dbStore;
+  private FileFactory.FileType fileType;
+
+  //as a cache for Carbon reader
+  private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;
+
+  @Inject public CarbonTableReader(CarbonTableConfig config) {
+    this.config = requireNonNull(config, "CarbonTableConfig is null");
+    this.cc = new ConcurrentHashMap<>();
+  }
+
+  //for worker node to initialize carbon metastore
+  public CarbonTableCacheModel getCarbonCache(SchemaTableName table) {
+    if (!cc.containsKey(table)) {
+      try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
+          FileFactory.class.getClassLoader())) {
+        if (dbStore == null) {
+          fileType = FileFactory.getFileType(config.getStorePath());
+          try {
+            dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+          } catch (Exception ex) {
+            throw new RuntimeException(ex);
+          }
+        }
+      }
+      updateSchemaTables();
+      parseCarbonMetadata(table);
+    }
+
+    if (cc.containsKey(table)) return cc.get(table);
+    else return null;
+  }
+
+  public List<String> getSchemaNames() {
+    return updateSchemaList();
+  }
+
+  //default PathFilter
+  private static final PathFilter DefaultFilter = new PathFilter() {
+    @Override public boolean accept(Path path) {
+      return CarbonTablePath.isCarbonDataFile(path.getName());
+    }
+  };
+
+  public boolean updateDbStore() {
+    if (dbStore == null) {
+      fileType = FileFactory.getFileType(config.getStorePath());
+      try {
+        dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+    return true;
+  }
+
+  public List<String> updateSchemaList() {
+    updateDbStore();
+
+    if (dbStore != null) {
+      List<String> scs =
+          Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
+      return scs;
+    } else return ImmutableList.of();
+  }
+
+  public Set<String> getTableNames(String schema) {
+    requireNonNull(schema, "schema is null");
+    return updateTableList(schema);
+  }
+
+  public Set<String> updateTableList(String dbName) {
+    List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName()))
+        .collect(Collectors.toList());
+    if (schema.size() > 0) {
+      return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName())
+          .collect(Collectors.toSet());
+    } else return ImmutableSet.of();
+  }
+
+  public CarbonTable getTable(SchemaTableName schemaTableName) {
+    try {
+      updateSchemaTables();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    requireNonNull(schemaTableName, "schemaTableName is null");
+    CarbonTable table = loadTableMetadata(schemaTableName);
+
+    return table;
+  }
+
+  public void updateSchemaTables() {
+    //update logic determine later
+    if (dbStore == null) {
+      updateSchemaList();
+    }
+
+    tableList = new LinkedList<>();
+    for (CarbonFile db : dbStore.listFiles()) {
+      if (!db.getName().endsWith(".mdt")) {
+        for (CarbonFile table : db.listFiles()) {
+          tableList.add(new SchemaTableName(db.getName(), table.getName()));
+        }
+      }
+    }
+  }
+
+  private CarbonTable loadTableMetadata(SchemaTableName schemaTableName) {
+    for (SchemaTableName table : tableList) {
+      if (!table.equals(schemaTableName)) continue;
+
+      return parseCarbonMetadata(table);
+    }
+    return null;
+  }
+
+  /**
+   * parse carbon metadata into cc(CarbonTableReader cache)
+   */
+  public CarbonTable parseCarbonMetadata(SchemaTableName table) {
+    CarbonTable result = null;
+    try {
+      CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel());
+      if (cache.isValid()) return cache.carbonTable;
+
+      //Step1: get table meta path, load carbon table param
+      String storePath = config.getStorePath();
+      cache.carbonTableIdentifier =
+          new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(),
+              UUID.randomUUID().toString());
+      cache.carbonTablePath =
+          PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier);
+      cc.put(table, cache);
+
+      //Step2: check file existed? read schema file
+      ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+        public TBase create() {
+          return new org.apache.carbondata.format.TableInfo();
+        }
+      };
+      ThriftReader thriftReader =
+          new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), createTBase);
+      thriftReader.open();
+      org.apache.carbondata.format.TableInfo tableInfo =
+          (org.apache.carbondata.format.TableInfo) thriftReader.read();
+      thriftReader.close();
+
+      //Format Level TableInfo, need transfer to Code Level TableInfo
+      SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+      TableInfo wrapperTableInfo = schemaConverter
+          .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+              storePath);
+      wrapperTableInfo.setMetaDataFilepath(
+          CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
+      //load metadata info into CarbonMetadata
+      CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+
+      cache.tableInfo = wrapperTableInfo;
+      cache.carbonTable = CarbonMetadata.getInstance()
+          .getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName());
+      result = cache.carbonTable;
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+
+    return result;
+  }
+
+  public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+      Expression filters) throws Exception {
+
+    //处理filter, 下推filter,将应用在Segment的索引上
+    FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
+
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        tableCacheModel.carbonTable.getAbsoluteTableIdentifier();
+    CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath());
+    List<String> invalidSegments = new ArrayList<>();
+    List<UpdateVO> invalidTimestampsList = new ArrayList<>();
+
+    // get all valid segments and set them into the configuration
+    SegmentUpdateStatusManager updateStatusManager =
+        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
+        segmentStatusManager.getValidAndInvalidSegments();
+
+    tableCacheModel.segments = segments.getValidSegments().toArray(new String[0]);
+    if (segments.getValidSegments().size() == 0) {
+      return new ArrayList<>(0);
+    }
+
+    // remove entry in the segment index if there are invalid segments
+    invalidSegments.addAll(segments.getInvalidSegments());
+    for (String invalidSegmentId : invalidSegments) {
+      invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
+    }
+    if (invalidSegments.size() > 0) {
+      List<TableSegmentUniqueIdentifier> invalidSegmentsIds =
+          new ArrayList<>(invalidSegments.size());
+      for (String segId : invalidSegments) {
+        invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segId));
+      }
+      cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
+    }
+
+    // get filter for segment
+    CarbonInputFormatUtil.processFilterExpression(filters, tableCacheModel.carbonTable);
+    FilterResolverIntf filterInterface = CarbonInputFormatUtil
+        .resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier());
+
+    List<CarbonLocalInputSplit> result = new ArrayList<>();
+    //for each segment fetch blocks matching filter in Driver BTree
+    for (String segmentNo : tableCacheModel.segments) {
+      try {
+        List<DataRefNode> dataRefNodes =
+            getDataBlocksOfSegment(filterExpressionProcessor, absoluteTableIdentifier,
+                tableCacheModel.carbonTablePath, filterInterface, segmentNo, cacheClient,
+                updateStatusManager);
+        for (DataRefNode dataRefNode : dataRefNodes) {
+          BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
+          TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
+
+          if (CarbonUtil.isInvalidTableBlock(tableBlockInfo,
+              updateStatusManager.getInvalidTimestampRange(tableBlockInfo.getSegmentId()),
+              updateStatusManager)) {
+            continue;
+          }
+          result.add(new CarbonLocalInputSplit(segmentNo, tableBlockInfo.getFilePath(),
+              tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
+              Arrays.asList(tableBlockInfo.getLocations()),
+              tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
+              tableBlockInfo.getVersion().number()));
+        }
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+    cacheClient.close();
+    return result;
+  }
+
+  /**
+   * get data blocks of given segment
+   */
+  private List<DataRefNode> getDataBlocksOfSegment(
+      FilterExpressionProcessor filterExpressionProcessor,
+      AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath,
+      FilterResolverIntf resolver, String segmentId, CacheClient cacheClient,
+      SegmentUpdateStatusManager updateStatusManager) throws IOException {
+    //DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance();
+    //QueryStatistic statistic = new QueryStatistic();
+
+    //读取Segment 内部的Index
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
+        getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient,
+            updateStatusManager);
+
+    List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
+
+    if (null != segmentIndexMap) {
+      // build result
+      for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
+        List<DataRefNode> filterredBlocks;
+        // if no filter is given get all blocks from Btree Index
+        if (null == resolver) {
+          filterredBlocks = getDataBlocksOfIndex(abstractIndex);
+        } else {
+          //ignore filter
+          //filterredBlocks = getDataBlocksOfIndex(abstractIndex);
+
+          // apply filter and get matching blocks
+          filterredBlocks = filterExpressionProcessor
+              .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex,
+                  absoluteTableIdentifier);
+        }
+        resultFilterredBlocks.addAll(filterredBlocks);
+      }
+    }
+    //statistic.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+    //recorder.recordStatisticsForDriver(statistic, "123456"/*job.getConfiguration().get("query.id")*/);
+    return resultFilterredBlocks;
+  }
+
+  private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper,
+      UpdateVO updateDetails) {
+    if (null != updateDetails.getLatestUpdateTimestamp()
+        && updateDetails.getLatestUpdateTimestamp() > segmentTaskIndexWrapper
+        .getRefreshedTimeStamp()) {
+      return true;
+    }
+    return false;
+  }
+
+  private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(/*JobContext job,*/
+      AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath, String segmentId,
+      CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws IOException {
+    Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
+    SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
+    boolean isSegmentUpdated = false;
+    Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
+    TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
+        new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
+    segmentTaskIndexWrapper =
+        cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
+    UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
+    if (null != segmentTaskIndexWrapper) {
+      segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+      if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
+        taskKeys = segmentIndexMap.keySet();
+        isSegmentUpdated = true;
+      }
+    }
+
+    // if segment tree is not loaded, load the segment tree
+    if (segmentIndexMap == null || isSegmentUpdated) {
+
+      List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
+      List<String> segs = new ArrayList<>();
+      segs.add(segmentId);
+
+      FileSystem fs =
+          getFileStatusOfSegments(new String[] { segmentId }, tablePath, fileStatusList);
+      List<InputSplit> splits = getSplit(fileStatusList, fs);
+
+      List<FileSplit> carbonSplits = new ArrayList<>();
+      for (InputSplit inputSplit : splits) {
+        FileSplit fileSplit = (FileSplit) inputSplit;
+        String segId = CarbonTablePath.DataPathUtil
+            .getSegmentId(fileSplit.getPath().toString());//这里的seperator应该怎么加??
+        if (segId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
+          continue;
+        }
+        carbonSplits.add(fileSplit);
+      }
+
+      List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
+      for (FileSplit inputSplit : carbonSplits) {
+        if (isValidBlockBasedOnUpdateDetails(taskKeys, inputSplit, updateDetails,
+            updateStatusManager, segmentId)) {
+
+          BlockletInfos blockletInfos = new BlockletInfos(0, 0,
+              0);//this level we do not need blocklet info!!!! Is this a trick?
+          tableBlockInfoList.add(
+              new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(), segmentId,
+                  inputSplit.getLocations(), inputSplit.getLength(), blockletInfos,
+                  ColumnarFormatVersion
+                      .valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION), null/*new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)*/));//这里的null是否会异常?
+        }
+      }
+
+      Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
+      segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
+      // get Btree blocks for given segment
+      tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
+      tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
+      segmentTaskIndexWrapper =
+          cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
+      segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+    }
+    return segmentIndexMap;
+  }
+
+  private boolean isValidBlockBasedOnUpdateDetails(
+      Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, FileSplit carbonInputSplit,
+      UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId) {
+    String taskID = null;
+    if (null != carbonInputSplit) {
+      if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) {
+        return false;
+      }
+
+      if (null == taskKeys) {
+        return true;
+      }
+
+      taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName());
+      String bucketNo =
+          CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName());
+
+      SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder =
+          new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo);
+
+      String blockTimestamp = carbonInputSplit.getPath().getName()
+          .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1,
+              carbonInputSplit.getPath().getName().lastIndexOf('.'));
+      if (!(updateDetails.getUpdateDeltaStartTimestamp() != null
+          && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) {
+        if (!taskKeys.contains(taskBucketHolder)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private List<InputSplit> getSplit(List<FileStatus> fileStatusList, FileSystem targetSystem)
+      throws IOException {
+
+    Iterator split = fileStatusList.iterator();
+
+    List<InputSplit> splits = new ArrayList<>();
+
+    while (true) {
+      while (true) {
+        while (split.hasNext()) {
+          FileStatus file = (FileStatus) split.next();
+          Path path = file.getPath();
+          long length = file.getLen();
+          if (length != 0L) {
+            BlockLocation[] blkLocations;
+            if (file instanceof LocatedFileStatus) {
+              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
+            } else {
+              blkLocations = targetSystem.getFileBlockLocations(file, 0L, length);
+            }
+
+            if (this.isSplitable()) {
+              long blockSize1 = file.getBlockSize();
+              long splitSize = this.computeSplitSize(blockSize1, 1, Long.MAX_VALUE);
+
+              long bytesRemaining;
+              int blkIndex;
+              for (
+                  bytesRemaining = length;
+                  (double) bytesRemaining / (double) splitSize > 1.1D;
+                  bytesRemaining -= splitSize) {
+                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
+                splits.add(this.makeSplit(path, length - bytesRemaining, splitSize,
+                    blkLocations[blkIndex].getHosts()));
+              }
+
+              if (bytesRemaining != 0L) {
+                blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
+                splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining,
+                    blkLocations[blkIndex].getHosts()));
+              }
+            } else {
+              splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length,
+                  blkLocations[0].getHosts()));
+            }
+          } else {
+            splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length,
+                new String[0]));
+          }
+        }
+        return splits;
+      }
+    }
+
+  }
+
+  private String[] getValidPartitions() {
+    //TODO: has to Identify partitions by partition pruning
+    return new String[] { "0" };
+  }
+
+  private FileSystem getFileStatusOfSegments(String[] segmentsToConsider, CarbonTablePath tablePath,
+      List<FileStatus> result) throws IOException {
+    String[] partitionsToConsider = getValidPartitions();
+    if (partitionsToConsider.length == 0) {
+      throw new IOException("No partitions/data found");
+    }
+
+    FileSystem fs = null;
+
+    //PathFilter inputFilter = getDataFileFilter(job);
+
+    // get tokens for all the required FileSystem for table path
+        /*TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { tablePath },
+                job.getConfiguration());*/
+
+    //get all data files of valid partitions and segments
+    for (int i = 0; i < partitionsToConsider.length; ++i) {
+      String partition = partitionsToConsider[i];
+
+      for (int j = 0; j < segmentsToConsider.length; ++j) {
+        String segmentId = segmentsToConsider[j];
+        Path segmentPath = new Path(tablePath.getCarbonDataDirectoryPath(partition, segmentId));
+
+        try {
+          Configuration conf = new Configuration();
+          fs = segmentPath.getFileSystem(conf);
+          //fs.initialize(segmentPath.toUri(), conf);
+
+          RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath);
+          while (iter.hasNext()) {
+            LocatedFileStatus stat = iter.next();
+            //if(stat.getPath().toString().contains("carbondata"))//参看carbondata的carbonInputFilter的实现
+            if (DefaultFilter.accept(stat.getPath())) {
+              if (stat.isDirectory()) {
+                addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);
+              } else {
+                result.add(stat);
+              }
+            }
+          }
+        } catch (Exception ex) {
+          System.out.println(ex.toString());
+        }
+      }
+    }
+    return fs;
+  }
+
+  protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path,
+      PathFilter inputFilter) throws IOException {
+    RemoteIterator iter = fs.listLocatedStatus(path);
+
+    while (iter.hasNext()) {
+      LocatedFileStatus stat = (LocatedFileStatus) iter.next();
+      if (inputFilter.accept(stat.getPath())) {
+        if (stat.isDirectory()) {
+          this.addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+        } else {
+          result.add(stat);
+        }
+      }
+    }
+
+  }
+
+  /**
+   * get data blocks of given btree
+   */
+  private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
+    List<DataRefNode> blocks = new LinkedList<DataRefNode>();
+    SegmentProperties segmentProperties = abstractIndex.getSegmentProperties();
+
+    try {
+      IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
+      IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
+
+      // Add all blocks of btree into result
+      DataRefNodeFinder blockFinder =
+          new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize());
+      DataRefNode startBlock =
+          blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey);
+      DataRefNode endBlock =
+          blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey);
+      while (startBlock != endBlock) {
+        blocks.add(startBlock);
+        startBlock = startBlock.getNextDataRefNode();
+      }
+      blocks.add(endBlock);
+
+    } catch (KeyGenException e) {
+      System.out.println("Could not generate start key" + e.getMessage());
+    }
+    return blocks;
+  }
+
+  private boolean isSplitable() {
+    try {
+      // Don't split the file if it is local file system
+      if (this.fileType == FileFactory.FileType.LOCAL) {
+        return false;
+      }
+    } catch (Exception e) {
+      return true;
+    }
+    return true;
+  }
+
+  private long computeSplitSize(long blockSize, long minSize, long maxSize) {
+    return Math.max(minSize, Math.min(maxSize, blockSize));
+  }
+
+  private FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
+    return new FileSplit(file, start, length, hosts);
+  }
+
+  private int getBlockIndex(BlockLocation[] blkLocations, long offset) {
+    for (int i = 0; i < blkLocations.length; i++) {
+      // is the offset inside this block?
+      if ((blkLocations[i].getOffset() <= offset) && (offset
+          < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
+        return i;
+      }
+    }
+    BlockLocation last = blkLocations[blkLocations.length - 1];
+    long fileLength = last.getOffset() + last.getLength() - 1;
+    throw new IllegalArgumentException("Offset " + offset +
+        " is outside of file (0.." +
+        fileLength + ")");
+  }
+
+  /**
+   * get total number of rows. for count(*)
+   *
+   * @throws IOException
+   * @throws IndexBuilderException
+   */
+  public long getRowCount() throws IOException, IndexBuilderException {
+    long rowCount = 0;
+        /*AbsoluteTableIdentifier absoluteTableIdentifier = this.carbonTable.getAbsoluteTableIdentifier();
+
+        // no of core to load the blocks in driver
+        //addSegmentsIfEmpty(job, absoluteTableIdentifier);
+        int numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
+        try {
+            numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+                    .getProperty(CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT));
+        } catch (NumberFormatException e) {
+            numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
+        }
+        // creating a thread pool
+        ExecutorService threadPool = Executors.newFixedThreadPool(numberOfCores);
+        List<Future<Map<String, AbstractIndex>>> loadedBlocks =
+                new ArrayList<Future<Map<String, AbstractIndex>>>();
+        //for each segment fetch blocks matching filter in Driver BTree
+        for (String segmentNo : this.segmentList) {
+            // submitting the task
+            loadedBlocks
+                    .add(threadPool.submit(new BlocksLoaderThread(*//*job,*//* absoluteTableIdentifier, segmentNo)));
+        }
+        threadPool.shutdown();
+        try {
+            threadPool.awaitTermination(1, TimeUnit.HOURS);
+        } catch (InterruptedException e) {
+            throw new IndexBuilderException(e);
+        }
+        try {
+            // adding all the rows of the blocks to get the total row
+            // count
+            for (Future<Map<String, AbstractIndex>> block : loadedBlocks) {
+                for (AbstractIndex abstractIndex : block.get().values()) {
+                    rowCount += abstractIndex.getTotalNumberOfRows();
+                }
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            throw new IndexBuilderException(e);
+        }*/
+    return rowCount;
+  }
+}


[5/5] carbondata git commit: resolved rebase conflict for presto conflict

Posted by ch...@apache.org.
resolved rebase conflict for presto conflict


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5f6f1a5c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5f6f1a5c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5f6f1a5c

Branch: refs/heads/master
Commit: 5f6f1a5c443face7a3abcca836cc33a805d0dba7
Parents: 0988a84
Author: chenliang613 <ch...@apache.org>
Authored: Wed May 24 18:43:01 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Wed May 24 18:43:01 2017 +0800

----------------------------------------------------------------------
 .../presto/CarbondataColumnHandle.java          |  25 ++-
 .../carbondata/presto/CarbondataConnector.java  |  83 +++++++++
 .../presto/CarbondataConnectorFactory.java      |   5 +-
 .../carbondata/presto/CarbondataMetadata.java   |  23 +--
 .../carbondata/presto/CarbondataModule.java     |  81 +++++++++
 .../carbondata/presto/CarbondataPageSource.java | 178 +++++++++++++++++++
 .../presto/CarbondataPageSourceProvider.java    |  50 ++++++
 .../presto/CarbondataRecordCursor.java          |  60 ++++++-
 .../carbondata/presto/CarbondataRecordSet.java  |   8 +-
 .../presto/CarbondataRecordSetProvider.java     |  12 +-
 10 files changed, 499 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
index b9152b5..4a9b7ed 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
@@ -46,6 +46,14 @@ public class CarbondataColumnHandle implements ColumnHandle {
   private final String columnUniqueId;
   private final boolean isInvertedIndex;
 
+  /**
+   * Used when this column contains decimal data.
+   */
+  private int scale;
+
+  private int precision;
+
+
   public boolean isMeasure() {
     return isMeasure;
   }
@@ -76,7 +84,9 @@ public class CarbondataColumnHandle implements ColumnHandle {
       @JsonProperty("isMeasure") boolean isMeasure,
       @JsonProperty("columnGroupId") int columnGroupId,
       @JsonProperty("columnUniqueId") String columnUniqueId,
-      @JsonProperty("isInvertedIndex") boolean isInvertedIndex) {
+      @JsonProperty("isInvertedIndex") boolean isInvertedIndex,
+      @JsonProperty("precision") int precision,
+      @JsonProperty("scale") int scale) {
     this.connectorId = requireNonNull(connectorId, "connectorId is null");
     this.columnName = requireNonNull(columnName, "columnName is null");
     this.columnType = requireNonNull(columnType, "columnType is null");
@@ -89,6 +99,8 @@ public class CarbondataColumnHandle implements ColumnHandle {
     this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null");
     this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
     this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
+    this.precision = precision;
+    this.scale = scale;
   }
 
   @JsonProperty public String getConnectorId() {
@@ -132,4 +144,15 @@ public class CarbondataColumnHandle implements ColumnHandle {
     return toStringHelper(this).add("connectorId", connectorId).add("columnName", columnName)
         .add("columnType", columnType).add("ordinalPosition", ordinalPosition).toString();
   }
+
+  @JsonProperty public int getScale() {
+    return scale;
+  }
+
+  @JsonProperty public int getPrecision() {
+    return precision;
+  }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
new file mode 100755
index 0000000..406ed93
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.transaction.IsolationLevel;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.log.Logger;
+
+import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataConnector implements Connector {
+
+  private static final Logger log = Logger.get(CarbondataConnector.class);
+
+  private final LifeCycleManager lifeCycleManager;
+  private final CarbondataMetadata metadata;
+  private final ConnectorSplitManager splitManager;
+  private final ConnectorRecordSetProvider recordSetProvider;
+  private final ClassLoader classLoader;
+  private final ConnectorPageSourceProvider pageSourceProvider;
+
+  public CarbondataConnector(LifeCycleManager lifeCycleManager, CarbondataMetadata metadata,
+      ConnectorSplitManager splitManager, ConnectorRecordSetProvider recordSetProvider,
+      ClassLoader classLoader, ConnectorPageSourceProvider pageSourceProvider) {
+    this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+    this.metadata = requireNonNull(metadata, "metadata is null");
+    this.splitManager = requireNonNull(splitManager, "splitManager is null");
+    this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+    this.classLoader = requireNonNull(classLoader, "classLoader is null");
+    this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
+  }
+
+  @Override public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel,
+      boolean readOnly) {
+    checkConnectorSupports(READ_COMMITTED, isolationLevel);
+    return CarbondataTransactionHandle.INSTANCE;
+  }
+
+  @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) {
+    metadata.putClassLoader(classLoader);
+    return metadata;
+  }
+
+  @Override public ConnectorSplitManager getSplitManager() {
+    return splitManager;
+  }
+
+  @Override public ConnectorRecordSetProvider getRecordSetProvider() {
+    return recordSetProvider;
+  }
+
+  @Override
+  public ConnectorPageSourceProvider getPageSourceProvider()
+  {
+    return pageSourceProvider;
+  }
+
+  @Override public final void shutdown() {
+    try {
+      lifeCycleManager.stop();
+    } catch (Exception e) {
+      log.error(e, "Error shutting down connector");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index d1c8082..d97f19e 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.presto;
 import com.facebook.presto.spi.ConnectorHandleResolver;
 import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
 import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
 import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
 import com.google.common.base.Throwables;
 import com.google.inject.Injector;
@@ -70,10 +71,12 @@ public class CarbondataConnectorFactory implements ConnectorFactory {
       ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
       ConnectorRecordSetProvider connectorRecordSet =
           injector.getInstance(ConnectorRecordSetProvider.class);
+       ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
 
       return new CarbondataConnector(lifeCycleManager, metadata,
           new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
-          classLoader
+          classLoader,
+          new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader)
       );
     } catch (Exception e) {
       throw Throwables.propagate(e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index f2d594a..7701490 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -123,9 +123,8 @@ public class CarbondataMetadata implements ConnectorMetadata {
     List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(schemaTableName.getTableName());
     for (CarbonColumn col : carbonColumns) {
       //show columns command will return these data
-      Type columnType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
-      ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(),
-          columnType);
+      Type columnType = CarbondataType2SpiMapper(col.getColumnSchema());
+      ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(), columnType);
       columnsMetaList.add(columnMeta);
     }
 
@@ -162,21 +161,21 @@ public class CarbondataMetadata implements ConnectorMetadata {
       column.getNumberOfChild();
       column.getListOfChildDimensions();
 
-      Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+      Type spiType = CarbondataType2SpiMapper(cs);
       columnHandles.put(cs.getColumnName(),
           new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, column.getSchemaOrdinal(),
               column.getKeyOrdinal(), column.getColumnGroupOrdinal(), false, cs.getColumnGroupId(),
-              cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+              cs.getColumnUniqueId(), cs.isUseInvertedIndex(), cs.getPrecision(), cs.getScale()));
     }
 
     for (CarbonMeasure measure : cb.getMeasureByTableName(tableName)) {
       ColumnSchema cs = measure.getColumnSchema();
 
-      Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+      Type spiType = CarbondataType2SpiMapper(cs);
       columnHandles.put(cs.getColumnName(),
           new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, cs.getSchemaOrdinal(),
               measure.getOrdinal(), cs.getColumnGroupId(), true, cs.getColumnGroupId(),
-              cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+              cs.getColumnUniqueId(), cs.isUseInvertedIndex(), cs.getPrecision(), cs.getScale()));
     }
 
     //should i cache it?
@@ -230,7 +229,8 @@ public class CarbondataMetadata implements ConnectorMetadata {
     return getTableMetadata(carbondataTableHandle.getSchemaTableName());
   }
 
-  public static Type CarbondataType2SpiMapper(DataType colType) {
+  public static Type CarbondataType2SpiMapper(ColumnSchema columnSchema) {
+    DataType colType = columnSchema.getDataType();
     switch (colType) {
       case BOOLEAN:
         return BooleanType.BOOLEAN;
@@ -243,9 +243,12 @@ public class CarbondataMetadata implements ConnectorMetadata {
       case FLOAT:
       case DOUBLE:
         return DoubleType.DOUBLE;
-
       case DECIMAL:
-        return DecimalType.createDecimalType();
+        if(columnSchema.getPrecision() > 0){
+          return DecimalType.createDecimalType(columnSchema.getPrecision(), columnSchema.getScale());
+        } else {
+          return DecimalType.createDecimalType();
+        }
       case STRING:
         return VarcharType.VARCHAR;
       case DATE:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
new file mode 100755
index 0000000..1d8b2b2
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+import org.apache.carbondata.presto.impl.CarbonTableReader;
+
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.type.TypeManager;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+import javax.inject.Inject;
+
+import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataModule implements Module {
+
+  private final String connectorId;
+  private final TypeManager typeManager;
+
+  public CarbondataModule(String connectorId, TypeManager typeManager) {
+    this.connectorId = requireNonNull(connectorId, "connector id is null");
+    this.typeManager = requireNonNull(typeManager, "typeManager is null");
+  }
+
+  @Override public void configure(Binder binder) {
+    binder.bind(TypeManager.class).toInstance(typeManager);
+
+    binder.bind(CarbondataConnectorId.class).toInstance(new CarbondataConnectorId(connectorId));
+    binder.bind(CarbondataMetadata.class).in(Scopes.SINGLETON);
+    binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
+    binder.bind(ConnectorSplitManager.class).to(CarbondataSplitManager.class).in(Scopes.SINGLETON);
+    binder.bind(ConnectorRecordSetProvider.class).to(CarbondataRecordSetProvider.class)
+        .in(Scopes.SINGLETON);
+    binder.bind(ConnectorPageSourceProvider.class).to(CarbondataPageSourceProvider.class)
+        .in(Scopes.SINGLETON);
+    binder.bind(CarbondataHandleResolver.class).in(Scopes.SINGLETON);
+    binder.bind(CarbondataRecordSetProvider.class).in(Scopes.SINGLETON);
+    configBinder(binder).bindConfig(CarbonTableConfig.class);
+  }
+
+  public static final class TypeDeserializer extends FromStringDeserializer<Type> {
+    private final TypeManager typeManager;
+
+    @Inject public TypeDeserializer(TypeManager typeManager) {
+      super(Type.class);
+      this.typeManager = requireNonNull(typeManager, "typeManager is null");
+    }
+
+    @Override protected Type _deserialize(String value, DeserializationContext context) {
+      Type type = typeManager.getType(parseTypeSignature(value));
+      checkArgument(type != null, "Unknown type %s", value);
+      return type;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
new file mode 100644
index 0000000..7c50c66
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.Page;
+import com.facebook.presto.spi.PageBuilder;
+import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.type.DecimalType;
+import com.facebook.presto.spi.type.Type;
+import io.airlift.slice.Slice;
+
+import static com.facebook.presto.spi.type.Decimals.encodeUnscaledValue;
+import static com.facebook.presto.spi.type.Decimals.isShortDecimal;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.math.RoundingMode.HALF_UP;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Carbondata Page Source class for custom Carbondata RecordSet Iteration.
+ */
+public class CarbondataPageSource implements ConnectorPageSource {
+
+  private static final int ROWS_PER_REQUEST = 4096;
+  private final RecordCursor cursor;
+  private final List<Type> types;
+  private final PageBuilder pageBuilder;
+  private boolean closed;
+  private final char[] buffer = new char[100];
+
+  public CarbondataPageSource(RecordSet recordSet)
+  {
+    this(requireNonNull(recordSet, "recordSet is null").getColumnTypes(), recordSet.cursor());
+  }
+
+  public CarbondataPageSource(List<Type> types, RecordCursor cursor)
+  {
+    this.cursor = requireNonNull(cursor, "cursor is null");
+    this.types = unmodifiableList(new ArrayList<>(requireNonNull(types, "types is null")));
+    this.pageBuilder = new PageBuilder(this.types);
+  }
+
+  public RecordCursor getCursor()
+  {
+    return cursor;
+  }
+
+  @Override public long getTotalBytes() {
+    return cursor.getTotalBytes();
+  }
+
+  @Override public long getCompletedBytes() {
+    return cursor.getCompletedBytes();
+  }
+
+  @Override public long getReadTimeNanos() {
+    return cursor.getReadTimeNanos();
+  }
+
+  @Override public boolean isFinished() {
+    return closed && pageBuilder.isEmpty();
+  }
+
+  @Override public Page getNextPage() {
+    if (!closed) {
+      int i;
+      for (i = 0; i < ROWS_PER_REQUEST; i++) {
+        if (pageBuilder.isFull()) {
+          break;
+        }
+        if (!cursor.advanceNextPosition()) {
+          closed = true;
+          break;
+        }
+
+        pageBuilder.declarePosition();
+        for (int column = 0; column < types.size(); column++) {
+          BlockBuilder output = pageBuilder.getBlockBuilder(column);
+          if (cursor.isNull(column)) {
+            output.appendNull();
+          } else {
+            Type type = types.get(column);
+            Class<?> javaType = type.getJavaType();
+            if (javaType == boolean.class) {
+              type.writeBoolean(output, cursor.getBoolean(column));
+            } else if (javaType == long.class) {
+              type.writeLong(output, cursor.getLong(column));
+            } else if (javaType == double.class) {
+              type.writeDouble(output, cursor.getDouble(column));
+            } else if (javaType == Slice.class) {
+              Slice slice = cursor.getSlice(column);
+              if(type instanceof  DecimalType)
+              {
+                if (isShortDecimal(type)) {
+                  type.writeLong(output, parseLong((DecimalType) type, slice, 0, slice.length()));
+                } else {
+                  type.writeSlice(output, parseSlice((DecimalType) type, slice, 0, slice.length()));
+                }
+              } else {
+                type.writeSlice(output, slice, 0, slice.length());
+              }
+            } else {
+              type.writeObject(output, cursor.getObject(column));
+            }
+          }
+        }
+      }
+    }
+
+    // only return a page if the buffer is full or we are finishing
+    if (pageBuilder.isEmpty() || (!closed && !pageBuilder.isFull())) {
+      return null;
+    }
+    Page page = pageBuilder.build();
+    pageBuilder.reset();
+    return page;
+ }
+
+  @Override public long getSystemMemoryUsage() {
+    return cursor.getSystemMemoryUsage() + pageBuilder.getSizeInBytes();
+  }
+
+  @Override public void close() throws IOException {
+    closed = true;
+    cursor.close();
+
+  }
+
+  private long parseLong(DecimalType type, Slice slice, int offset, int length)
+  {
+    BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
+    return decimal.unscaledValue().longValue();
+  }
+
+
+  private Slice parseSlice(DecimalType type, Slice slice, int offset, int length)
+  {
+    BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
+    return encodeUnscaledValue(decimal.unscaledValue());
+  }
+
+  private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int offset, int length)
+  {
+    checkArgument(length < buffer.length);
+    for (int i = 0; i < length; i++) {
+      buffer[i] = (char) slice.getByte(offset + i);
+    }
+    BigDecimal decimal = new BigDecimal(buffer, 0, length);
+    checkState(decimal.scale() <= type.getScale(), "Read decimal value scale larger than column scale");
+    decimal = decimal.setScale(type.getScale(), HALF_UP);
+    checkState(decimal.precision() <= type.getPrecision(), "Read decimal precision larger than column precision");
+    return decimal;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
new file mode 100644
index 0000000..46d8611
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.List;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.inject.Inject;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Provider Class for Carbondata Page Source class.
+ */
+public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider {
+
+  private CarbondataRecordSetProvider carbondataRecordSetProvider;
+
+  @Inject
+  public CarbondataPageSourceProvider(CarbondataRecordSetProvider carbondataRecordSetProvider)
+  {
+    this.carbondataRecordSetProvider = requireNonNull(carbondataRecordSetProvider, "recordSetProvider is null");
+  }
+
+  @Override
+  public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle,
+      ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) {
+    return new CarbondataPageSource(carbondataRecordSetProvider.getRecordSet(transactionHandle, session, split, columns));
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
index ad47f75..2e97dc0 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
@@ -18,7 +18,14 @@
 package org.apache.carbondata.presto;
 
 import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.type.DecimalType;
+import com.facebook.presto.spi.type.Decimals;
+import com.facebook.presto.spi.type.TimestampType;
 import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.block.BlockBuilderStatus;
+
 import com.google.common.base.Strings;
 import io.airlift.log.Logger;
 import io.airlift.slice.Slice;
@@ -26,16 +33,22 @@ import io.airlift.slice.Slices;
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
 
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+
 
 import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
+import static com.facebook.presto.spi.type.Decimals.isShortDecimal;
+import static com.facebook.presto.spi.type.Decimals.rescale;
 import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
 import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
+import static io.airlift.slice.Slices.utf8Slice;
 
 public class CarbondataRecordCursor implements RecordCursor {
 
@@ -114,8 +127,10 @@ public class CarbondataRecordCursor implements RecordCursor {
 
   @Override public long getLong(int field) {
     String timeStr = getFieldValue(field);
-    Long milliSec = 0L;
-
+    Type actual = getType(field);
+    if(actual instanceof TimestampType){
+      return new Timestamp(Long.parseLong(timeStr)).getTime()/1000;
+    }
     //suppose the
     return Math.round(Double.parseDouble(getFieldValue(field)));
   }
@@ -126,8 +141,41 @@ public class CarbondataRecordCursor implements RecordCursor {
   }
 
   @Override public Slice getSlice(int field) {
-    checkFieldType(field, VARCHAR);
-    return Slices.utf8Slice(getFieldValue(field));
+    Type decimalType = getType(field);
+    if (decimalType instanceof DecimalType) {
+      DecimalType actual = (DecimalType) decimalType;
+      CarbondataColumnHandle carbondataColumnHandle = columnHandles.get(field);
+      if(carbondataColumnHandle.getPrecision() > 0 ) {
+        checkFieldType(field, DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(), carbondataColumnHandle.getScale()));
+      } else {
+        checkFieldType(field, DecimalType.createDecimalType());
+      }
+      String fieldValue = getFieldValue(field);
+      BigDecimal bigDecimalValue = new BigDecimal(fieldValue);
+      if (isShortDecimal(decimalType)) {
+        return utf8Slice(Decimals.toString(bigDecimalValue.longValue(), actual.getScale()));
+      } else {
+        if (bigDecimalValue.scale() > actual.getScale()) {
+          BigInteger unscaledDecimal =
+              rescale(bigDecimalValue.unscaledValue(), bigDecimalValue.scale(),
+                  bigDecimalValue.scale());
+          Slice decimalSlice = Decimals.encodeUnscaledValue(unscaledDecimal);
+          return utf8Slice(Decimals.toString(decimalSlice, actual.getScale()));
+          //return decimalSlice;
+        } else {
+          BigInteger unscaledDecimal =
+              rescale(bigDecimalValue.unscaledValue(), bigDecimalValue.scale(), actual.getScale());
+          Slice decimalSlice = Decimals.encodeUnscaledValue(unscaledDecimal);
+          return utf8Slice(Decimals.toString(decimalSlice, actual.getScale()));
+          //return decimalSlice;
+
+        }
+
+      }
+    } else {
+      checkFieldType(field, VARCHAR);
+      return utf8Slice(getFieldValue(field));
+    }
   }
 
   @Override public Object getObject(int field) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
index d3fd7a0..7bf0e84 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
@@ -99,12 +99,10 @@ public class CarbondataRecordSet implements RecordSet {
       RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns, split);
       return rc;
     } catch (QueryExecutionException e) {
-      //throw new InterruptedException(e.getMessage());
-      System.out.println(e.getMessage());
-    } catch (Exception ex) {
-      System.out.println(ex.toString());
+       throw new RuntimeException(e.getMessage(), e);
+   } catch (Exception ex) {
+      throw new RuntimeException(ex.getMessage(), ex);
     }
-    return null;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index f0958c7..a9652cc 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -129,7 +129,7 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
       CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
       Type type = cdch.getColumnType();
 
-      DataType coltype = Spi2CarbondataTypeMapper(type);
+      DataType coltype = Spi2CarbondataTypeMapper(cdch);
       Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
 
       domain = originalConstraint.getDomains().get().get(c);
@@ -200,6 +200,10 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
         if (coltype.equals(DataType.STRING)) {
           ex = new EqualToExpression(colExpression,
               new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+        } else if (coltype.equals(DataType.TIMESTAMP) || coltype.equals(DataType.DATE)) {
+          Long value = (Long) singleValues.get(0) * 1000;
+          ex = new EqualToExpression(colExpression,
+              new LiteralExpression(value , coltype));
         } else ex = new EqualToExpression(colExpression,
             new LiteralExpression(singleValues.get(0), coltype));
         filters.add(ex);
@@ -241,16 +245,18 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
         CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier()));
   }
 
-  public static DataType Spi2CarbondataTypeMapper(Type colType) {
+  public static DataType Spi2CarbondataTypeMapper(CarbondataColumnHandle carbondataColumnHandle) {
+    Type colType = carbondataColumnHandle.getColumnType();
     if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
     else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
     else if (colType == IntegerType.INTEGER) return DataType.INT;
     else if (colType == BigintType.BIGINT) return DataType.LONG;
     else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
-    else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
     else if (colType == VarcharType.VARCHAR) return DataType.STRING;
     else if (colType == DateType.DATE) return DataType.DATE;
     else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+    else if (colType == DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(),
+        carbondataColumnHandle.getScale())) return DataType.DECIMAL;
     else return DataType.STRING;
   }
 


[4/5] carbondata git commit: refactor integration/presto

Posted by ch...@apache.org.
refactor integration/presto


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0988a847
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0988a847
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0988a847

Branch: refs/heads/master
Commit: 0988a847d4bbfe38a2741d32aaf7c9a32f7e7889
Parents: f32c503
Author: chenliang613 <ch...@huawei.com>
Authored: Sat Apr 8 15:07:47 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Wed May 24 18:42:11 2017 +0800

----------------------------------------------------------------------
 .../presto/CarbondataConnectorFactory.java      | 12 ------
 .../carbondata/presto/CarbondataMetadata.java   | 34 ++++++++---------
 .../presto/CarbondataRecordSetProvider.java     | 12 +++---
 .../presto/impl/CarbonTableReader.java          | 40 +++++++++-----------
 4 files changed, 37 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0988a847/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index 66c007d..d1c8082 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -67,25 +67,13 @@ public class CarbondataConnectorFactory implements ConnectorFactory {
 
       LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
       CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
-      //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
       ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
       ConnectorRecordSetProvider connectorRecordSet =
           injector.getInstance(ConnectorRecordSetProvider.class);
-      //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class);
-
-      //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
-      //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);
-      //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
-      //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
-      //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
 
       return new CarbondataConnector(lifeCycleManager, metadata,
           new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
-          //new ClassLoaderSafeConnectorRecordSetProvider(, classLoader),
           classLoader
-          //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
-          //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
-          //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
       );
     } catch (Exception e) {
       throw Throwables.propagate(e);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0988a847/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index d938a3d..f2d594a 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -59,11 +59,11 @@ public class CarbondataMetadata implements ConnectorMetadata {
   }
 
   public List<String> listSchemaNamesInternal() {
-    List<String> ret;
+    List<String> schemaNameList;
     try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
-      ret = carbonTableReader.getSchemaNames();
+      schemaNameList = carbonTableReader.getSchemaNames();
     }
-    return ret;
+    return schemaNameList;
   }
 
   @Override
@@ -109,27 +109,28 @@ public class CarbondataMetadata implements ConnectorMetadata {
     return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
   }
 
-  private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) {
-    if (!listSchemaNamesInternal().contains(tableName.getSchemaName())) {
+  private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) {
+    if (!listSchemaNamesInternal().contains(schemaTableName.getSchemaName())) {
       return null;
     }
 
-    CarbonTable cb = carbonTableReader.getTable(tableName);
-    if (cb == null) {
+    CarbonTable carbonTable = carbonTableReader.getTable(schemaTableName);
+    if (carbonTable == null) {
       return null;
     }
 
-    List<ColumnMetadata> spiCols = new LinkedList<>();
-    List<CarbonColumn> carbonColumns = cb.getCreateOrderColumn(tableName.getTableName());
+    List<ColumnMetadata> columnsMetaList = new LinkedList<>();
+    List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(schemaTableName.getTableName());
     for (CarbonColumn col : carbonColumns) {
       //show columns command will return these data
-      Type spiType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
-      ColumnMetadata spiCol = new ColumnMetadata(col.getColumnSchema().getColumnName(), spiType);
-      spiCols.add(spiCol);
+      Type columnType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
+      ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(),
+          columnType);
+      columnsMetaList.add(columnMeta);
     }
 
     //carbondata connector's table metadata
-    return new ConnectorTableMetadata(tableName, spiCols);
+    return new ConnectorTableMetadata(schemaTableName, columnsMetaList);
   }
 
   @Override public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session,
@@ -141,6 +142,7 @@ public class CarbondataMetadata implements ConnectorMetadata {
         "tableHandle is not for this connector");
 
     String schemaName = handle.getSchemaTableName().getSchemaName();
+
     if (!listSchemaNamesInternal().contains(schemaName)) {
       throw new SchemaNotFoundException(schemaName);
     }
@@ -250,12 +252,6 @@ public class CarbondataMetadata implements ConnectorMetadata {
         return DateType.DATE;
       case TIMESTAMP:
         return TimestampType.TIMESTAMP;
-
-            /*case DataType.MAP:
-            case DataType.ARRAY:
-            case DataType.STRUCT:
-            case DataType.NULL:*/
-
       default:
         return VarcharType.VARCHAR;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0988a847/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index 8b087df..f0958c7 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -61,8 +61,6 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
 
   @Inject
   public CarbondataRecordSetProvider(CarbondataConnectorId connectorId, CarbonTableReader reader) {
-    //this.config = requireNonNull(config, "config is null");
-    //this.connector = requireNonNull(connector, "connector is null");
     this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
     this.carbonTableReader = reader;
   }
@@ -72,9 +70,9 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
     requireNonNull(split, "split is null");
     requireNonNull(columns, "columns is null");
 
-    CarbondataSplit cdSplit =
+    CarbondataSplit carbondataSplit =
         checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
-    checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
+    checkArgument(carbondataSplit.getConnectorId().equals(connectorId), "split is not for this connector");
 
     String targetCols = "";
     // Convert all columns handles
@@ -95,7 +93,7 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
     //String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
 
     CarbonTableCacheModel tableCacheModel =
-        carbonTableReader.getCarbonCache(cdSplit.getSchemaTableName());
+        carbonTableReader.getCarbonCache(carbondataSplit.getSchemaTableName());
     checkNotNull(tableCacheModel, "tableCacheModel should not be null");
     checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
     checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null");
@@ -107,10 +105,10 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
         QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable);
 
     // Push down filter
-    fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
+    fillFilter2QueryModel(queryModel, carbondataSplit.getConstraints(), targetTable);
 
     // Return new record set
-    return new CarbondataRecordSet(targetTable, session, cdSplit,
+    return new CarbondataRecordSet(targetTable, session, carbondataSplit,
         handles.build(), queryModel);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0988a847/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index bb482b0..b6e45d6 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -52,18 +52,13 @@ import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.hadoop.CacheClient;
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.thrift.TBase;
 
-import java.io.BufferedReader;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URI;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -82,7 +77,7 @@ public class CarbonTableReader {
 
   private CarbonTableConfig config;
   private List<SchemaTableName> tableList;
-  private CarbonFile dbStore;
+  private CarbonFile carbonFileList;
   private FileFactory.FileType fileType;
 
   // A cache for Carbon reader
@@ -98,10 +93,10 @@ public class CarbonTableReader {
     if (!cc.containsKey(table)) {
       try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
           FileFactory.class.getClassLoader())) {
-        if (dbStore == null) {
+        if (carbonFileList == null) {
           fileType = FileFactory.getFileType(config.getStorePath());
           try {
-            dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+            carbonFileList = FileFactory.getCarbonFile(config.getStorePath(), fileType);
           } catch (Exception ex) {
             throw new RuntimeException(ex);
           }
@@ -126,11 +121,11 @@ public class CarbonTableReader {
     }
   };
 
-  public boolean updateDbStore() {
-    if (dbStore == null) {
+  public boolean updateCarbonFile() {
+    if (carbonFileList == null) {
       fileType = FileFactory.getFileType(config.getStorePath());
       try {
-        dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+        carbonFileList = FileFactory.getCarbonFile(config.getStorePath(), fileType);
       } catch (Exception ex) {
         throw new RuntimeException(ex);
       }
@@ -139,12 +134,12 @@ public class CarbonTableReader {
   }
 
   public List<String> updateSchemaList() {
-    updateDbStore();
+    updateCarbonFile();
 
-    if (dbStore != null) {
-      List<String> scs =
-          Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
-      return scs;
+    if (carbonFileList != null) {
+      List<String> schemaList =
+          Stream.of(carbonFileList.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
+      return schemaList;
     } else return ImmutableList.of();
   }
 
@@ -154,7 +149,7 @@ public class CarbonTableReader {
   }
 
   public Set<String> updateTableList(String dbName) {
-    List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName()))
+    List<CarbonFile> schema = Stream.of(carbonFileList.listFiles()).filter(a -> dbName.equals(a.getName()))
         .collect(Collectors.toList());
     if (schema.size() > 0) {
       return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName())
@@ -177,15 +172,14 @@ public class CarbonTableReader {
 
   public void updateSchemaTables() {
     // update logic determine later
-    if (dbStore == null) {
+    if (carbonFileList == null) {
       updateSchemaList();
     }
-
     tableList = new LinkedList<>();
-    for (CarbonFile db : dbStore.listFiles()) {
-      if (!db.getName().endsWith(".mdt")) {
-        for (CarbonFile table : db.listFiles()) {
-          tableList.add(new SchemaTableName(db.getName(), table.getName()));
+    for (CarbonFile cf : carbonFileList.listFiles()) {
+      if (!cf.getName().endsWith(".mdt")) {
+        for (CarbonFile table : cf.listFiles()) {
+          tableList.add(new SchemaTableName(cf.getName(), table.getName()));
         }
       }
     }


[2/5] carbondata git commit: resolved rebase conflict for presto branch

Posted by ch...@apache.org.
resolved rebase conflict for presto branch


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4b682f54
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4b682f54
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4b682f54

Branch: refs/heads/master
Commit: 4b682f541976cd3e56b486e3479f47096cc1bcd9
Parents: a60775e
Author: chenliang613 <ch...@apache.org>
Authored: Wed May 24 18:37:27 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Wed May 24 18:37:27 2017 +0800

----------------------------------------------------------------------
 .../presto/CarbondataRecordSetProvider.java     | 277 +++++++++++++++++++
 1 file changed, 277 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b682f54/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
new file mode 100755
index 0000000..63b926f
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
+import org.apache.carbondata.presto.impl.CarbonTableReader;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.*;
+import com.google.common.collect.ImmutableList;
+import io.airlift.slice.Slice;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.*;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.carbondata.presto.Types.checkType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
+
+  private final String connectorId;
+  private final CarbonTableReader carbonTableReader;
+
+  @Inject
+  public CarbondataRecordSetProvider(CarbondataConnectorId connectorId, CarbonTableReader reader) {
+    //this.config = requireNonNull(config, "config is null");
+    //this.connector = requireNonNull(connector, "connector is null");
+    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+    this.carbonTableReader = reader;
+  }
+
+  @Override public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle,
+      ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) {
+    requireNonNull(split, "split is null");
+    requireNonNull(columns, "columns is null");
+
+    // Convert split
+    CarbondataSplit cdSplit =
+        checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
+    checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
+
+    String targetCols = "";
+    // Convert all columns handles
+    ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder();
+    for (ColumnHandle handle : columns) {
+      handles.add(checkType(handle, CarbondataColumnHandle.class, "handle"));
+      targetCols += ((CarbondataColumnHandle) handle).getColumnName() + ",";
+    }
+
+    // Build column projection(check the column order)
+    if (targetCols.length() > 0) {
+      targetCols = targetCols.substring(0, targetCols.length() - 1);
+    }
+    else
+    {
+      targetCols = null;
+    }
+    //String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
+
+    CarbonTableCacheModel tableCacheModel =
+        carbonTableReader.getCarbonCache(cdSplit.getSchemaTableName());
+    checkNotNull(tableCacheModel, "tableCacheModel should not be null");
+    checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
+    checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null");
+
+    // Build Query Model
+    CarbonTable targetTable = tableCacheModel.carbonTable;
+    CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(targetTable, targetCols);
+    QueryModel queryModel =
+        QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable);
+
+    // Push down filter
+    fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
+
+    // Return new record set
+    return new CarbondataRecordSet(targetTable,/*connector,*/ session, /*config, */cdSplit,
+        handles.build(), queryModel);
+  }
+
+  // Build filter for QueryModel (copy from CarbonInputFormat=> createRecordReader)
+  private void fillFilter2QueryModel(QueryModel queryModel,
+      TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) {
+
+    //queryModel.setFilterExpressionResolverTree(new FilterResolverIntf());
+
+    //Build Predicate Expression
+    ImmutableList.Builder<Expression> filters = ImmutableList.builder();
+
+    Domain domain = null;
+
+    for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+
+      // Build ColumnExpresstion for Expresstion(Carbondata)
+      CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
+      Type type = cdch.getColumnType();
+
+      DataType coltype = Spi2CarbondataTypeMapper(type);
+      Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
+
+      domain = originalConstraint.getDomains().get().get(c);
+      checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+      if (domain.getValues().isNone()) {
+        //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
+        //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
+        //new Expression()
+      }
+
+      if (domain.getValues().isAll()) {
+        //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
+        //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
+      }
+
+      List<Object> singleValues = new ArrayList<>();
+      List<Expression> rangeFilter = new ArrayList<>();
+      for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
+        checkState(!range.isAll()); // Already checked
+        if (range.isSingleValue()) {
+          singleValues.add(range.getLow().getValue());
+        } else {
+          List<String> rangeConjuncts = new ArrayList<>();
+          if (!range.getLow().isLowerUnbounded()) {
+            Object value = ConvertDataByType(range.getLow().getValue(), type);
+            switch (range.getLow().getBound()) {
+              case ABOVE:
+                if (type == TimestampType.TIMESTAMP) {
+                  //todo not now
+                } else {
+                  GreaterThanExpression greater = new GreaterThanExpression(colExpression,
+                      new LiteralExpression(value, coltype));
+                  //greater.setRangeExpression(true);
+                  rangeFilter.add(greater);
+                }
+                break;
+              case EXACTLY:
+                GreaterThanEqualToExpression greater =
+                    new GreaterThanEqualToExpression(colExpression,
+                        new LiteralExpression(value, coltype));
+                //greater.setRangeExpression(true);
+                rangeFilter.add(greater);
+                break;
+              case BELOW:
+                throw new IllegalArgumentException("Low marker should never use BELOW bound");
+              default:
+                throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
+            }
+          }
+          if (!range.getHigh().isUpperUnbounded()) {
+            Object value = ConvertDataByType(range.getHigh().getValue(), type);
+            switch (range.getHigh().getBound()) {
+              case ABOVE:
+                throw new IllegalArgumentException("High marker should never use ABOVE bound");
+              case EXACTLY:
+                LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
+                    new LiteralExpression(value, coltype));
+                //less.setRangeExpression(true);
+                rangeFilter.add(less);
+                break;
+              case BELOW:
+                LessThanExpression less2 =
+                    new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
+                //less2.setRangeExpression(true);
+                rangeFilter.add(less2);
+                break;
+              default:
+                throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
+            }
+          }
+        }
+      }
+
+      if (singleValues.size() == 1) {
+        Expression ex = null;
+        if (coltype.equals(DataType.STRING)) {
+          ex = new EqualToExpression(colExpression,
+              new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+        } else ex = new EqualToExpression(colExpression,
+            new LiteralExpression(singleValues.get(0), coltype));
+        filters.add(ex);
+      } else if (singleValues.size() > 1) {
+        ListExpression candidates = null;
+        List<Expression> exs = singleValues.stream().map((a) -> {
+          return new LiteralExpression(ConvertDataByType(a, type), coltype);
+        }).collect(Collectors.toList());
+        candidates = new ListExpression(exs);
+
+        if (candidates != null) filters.add(new InExpression(colExpression, candidates));
+      } else if (rangeFilter.size() > 0) {
+        if (rangeFilter.size() > 1) {
+          Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
+          if (rangeFilter.size() > 2) {
+            for (int i = 2; i < rangeFilter.size(); i++) {
+              filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
+            }
+          }
+        } else if (rangeFilter.size() == 1) filters.add(rangeFilter.get(0));
+      }
+    }
+
+    Expression finalFilters;
+    List<Expression> tmp = filters.build();
+    if (tmp.size() > 1) {
+      finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
+      if (tmp.size() > 2) {
+        for (int i = 2; i < tmp.size(); i++) {
+          finalFilters = new AndExpression(finalFilters, tmp.get(i));
+        }
+      }
+    } else if (tmp.size() == 1) finalFilters = tmp.get(0);
+    else return;
+
+    // todo set into QueryModel
+    CarbonInputFormatUtil.processFilterExpression(finalFilters, carbonTable);
+    queryModel.setFilterExpressionResolverTree(
+        CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier()));
+  }
+
+  public static DataType Spi2CarbondataTypeMapper(Type colType) {
+    if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
+    else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
+    else if (colType == IntegerType.INTEGER) return DataType.INT;
+    else if (colType == BigintType.BIGINT) return DataType.LONG;
+    else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
+    else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
+    else if (colType == VarcharType.VARCHAR) return DataType.STRING;
+    else if (colType == DateType.DATE) return DataType.DATE;
+    else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+    else return DataType.STRING;
+  }
+
+  public Object ConvertDataByType(Object rawdata, Type type) {
+    if (type.equals(IntegerType.INTEGER)) return new Integer((rawdata.toString()));
+    else if (type.equals(BigintType.BIGINT)) return (Long) rawdata;
+    else if (type.equals(VarcharType.VARCHAR)) return ((Slice) rawdata).toStringUtf8();
+    else if (type.equals(BooleanType.BOOLEAN)) return (Boolean) (rawdata);
+
+    return rawdata;
+  }
+}


[3/5] carbondata git commit: resloved rebase conflict for presto branch

Posted by ch...@apache.org.
resloved rebase conflict for presto branch


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f32c503a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f32c503a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f32c503a

Branch: refs/heads/master
Commit: f32c503add5a774fd91efdd3222fd76c112981f3
Parents: 4b682f5
Author: chenliang613 <ch...@apache.org>
Authored: Wed May 24 18:42:00 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Wed May 24 18:42:00 2017 +0800

----------------------------------------------------------------------
 .../presto/CarbondataColumnConstraint.java      |  86 ++++++
 .../presto/CarbondataColumnHandle.java          |   6 -
 .../presto/CarbondataConnectorFactory.java      |  94 +++++++
 .../carbondata/presto/CarbondataMetadata.java   |   7 -
 .../presto/CarbondataRecordCursor.java          | 158 +++++++++++
 .../carbondata/presto/CarbondataRecordSet.java  | 110 ++++++++
 .../presto/CarbondataRecordSetProvider.java     |  14 +-
 .../presto/CarbondataSplitManager.java          | 279 +++++++++++++++++++
 .../presto/impl/CarbonTableCacheModel.java      |  44 +++
 .../presto/impl/CarbonTableConfig.java          |   3 +
 .../presto/impl/CarbonTableReader.java          |  40 ++-
 11 files changed, 793 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
new file mode 100755
index 0000000..82c7c78
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.predicate.Domain;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSetter;
+
+import java.util.Objects;
+import java.util.Optional;
+
+//import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Objects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Encapsulating presto Tuple-domain
+ */
+public class CarbondataColumnConstraint {
+  private final String name;
+  private final boolean invertedindexed;
+  private Optional<Domain> domain;
+
+  @JsonCreator public CarbondataColumnConstraint(@JsonProperty("name") String name,
+      @JsonProperty("domain") Optional<Domain> domain,
+      @JsonProperty("invertedindexed") boolean invertedindexed) {
+    this.name = requireNonNull(name, "name is null");
+    this.invertedindexed = requireNonNull(invertedindexed, "invertedIndexed is null");
+    this.domain = requireNonNull(domain, "domain is null");
+  }
+
+  @JsonProperty public boolean isInvertedindexed() {
+    return invertedindexed;
+  }
+
+  @JsonProperty public String getName() {
+    return name;
+  }
+
+  @JsonProperty public Optional<Domain> getDomain() {
+    return domain;
+  }
+
+  @JsonSetter public void setDomain(Optional<Domain> domain) {
+    this.domain = domain;
+  }
+
+  @Override public int hashCode() {
+    return Objects.hash(name, domain, invertedindexed);
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+
+    if ((obj == null) || (getClass() != obj.getClass())) {
+      return false;
+    }
+
+    CarbondataColumnConstraint other = (CarbondataColumnConstraint) obj;
+    return Objects.equals(this.name, other.name) && Objects.equals(this.domain, other.domain)
+        && Objects.equals(this.invertedindexed, other.invertedindexed);
+  }
+
+  @Override public String toString() {
+    return toStringHelper(this).add("name", this.name).add("invertedindexed", this.invertedindexed)
+        .add("domain", this.domain).toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
index cc10165..b9152b5 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
@@ -68,12 +68,6 @@ public class CarbondataColumnHandle implements ColumnHandle {
     return columnUniqueId;
   }
 
-  /**
-   * ordinalPosition of a columnhandle is the -> number of the column in the entire list of columns of this table
-   * IT DOESNT DEPEND ON THE QUERY (select clm3, clm0, clm1  from tablename)
-   * The columnhandle of clm3 : has ordinalposition = 3
-   */
-
   @JsonCreator public CarbondataColumnHandle(@JsonProperty("connectorId") String connectorId,
       @JsonProperty("columnName") String columnName, @JsonProperty("columnType") Type columnType,
       @JsonProperty("ordinalPosition") int ordinalPosition,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
new file mode 100755
index 0000000..66c007d
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
+import com.google.common.base.Throwables;
+import com.google.inject.Injector;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.json.JsonModule;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Build Carbondata Connector
+ * It will be called by CarbondataPlugin
+ */
+public class CarbondataConnectorFactory implements ConnectorFactory {
+
+  private final String name;
+  private final ClassLoader classLoader;
+
+  public CarbondataConnectorFactory(String connectorName, ClassLoader classLoader) {
+    this.name = connectorName;
+    this.classLoader = requireNonNull(classLoader, "classLoader is null");
+  }
+
+  @Override public String getName() {
+    return name;
+  }
+
+  @Override public ConnectorHandleResolver getHandleResolver() {
+    return new CarbondataHandleResolver();
+  }
+
+  @Override public Connector create(String connectorId, Map<String, String> config,
+      ConnectorContext context) {
+    requireNonNull(config, "config is null");
+
+    try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+      Bootstrap app = new Bootstrap(new JsonModule(),
+          new CarbondataModule(connectorId, context.getTypeManager()));
+
+      Injector injector =
+          app.strictConfig().doNotInitializeLogging().setRequiredConfigurationProperties(config)
+              .initialize();
+
+      LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
+      CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
+      //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
+      ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
+      ConnectorRecordSetProvider connectorRecordSet =
+          injector.getInstance(ConnectorRecordSetProvider.class);
+      //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class);
+
+      //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
+      //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);
+      //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
+      //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
+      //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
+
+      return new CarbondataConnector(lifeCycleManager, metadata,
+          new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
+          //new ClassLoaderSafeConnectorRecordSetProvider(, classLoader),
+          classLoader
+          //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
+          //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
+          //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
+      );
+    } catch (Exception e) {
+      throw Throwables.propagate(e);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index d2c5ab6..d938a3d 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -69,13 +69,6 @@ public class CarbondataMetadata implements ConnectorMetadata {
   @Override
   public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) {
 
-        /*List<SchemaTableName> all = carbonTableReader.getTableList();
-        if(schemaNameOrNull != null)
-        {
-            return all.stream().filter(a -> schemaNameOrNull.equals(a.getSchemaName())).collect(Collectors.toList());
-        }
-        return all;*/
-
     List<String> schemaNames;
     if (schemaNameOrNull != null) {
       schemaNames = ImmutableList.of(schemaNameOrNull);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
new file mode 100755
index 0000000..ad47f75
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.type.Type;
+import com.google.common.base.Strings;
+import io.airlift.log.Logger;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
+import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
+import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+public class CarbondataRecordCursor implements RecordCursor {
+
+  private static final Logger log = Logger.get(CarbondataRecordCursor.class);
+  private final List<CarbondataColumnHandle> columnHandles;
+
+  private List<String> fields;
+  private CarbondataSplit split;
+  private CarbonIterator<Object[]> rowCursor;
+  private CarbonReadSupport<Object[]> readSupport;
+
+  private long totalBytes;
+  private long nanoStart;
+  private long nanoEnd;
+
+  public CarbondataRecordCursor(CarbonReadSupport<Object[]> readSupport,
+      CarbonIterator<Object[]> carbonIterator, List<CarbondataColumnHandle> columnHandles,
+      CarbondataSplit split) {
+    this.rowCursor = carbonIterator;
+    this.columnHandles = columnHandles;
+    this.readSupport = readSupport;
+    this.totalBytes = 0;
+  }
+
+  @Override public long getTotalBytes() {
+    return totalBytes;
+  }
+
+  @Override public long getCompletedBytes() {
+    return totalBytes;
+  }
+
+  @Override public long getReadTimeNanos() {
+    return nanoStart > 0L ? (nanoEnd == 0 ? System.nanoTime() : nanoEnd) - nanoStart : 0L;
+  }
+
+  @Override public Type getType(int field) {
+
+    checkArgument(field < columnHandles.size(), "Invalid field index");
+    return columnHandles.get(field).getColumnType();
+  }
+
+  /**
+   * get next Row/Page
+   */
+  @Override public boolean advanceNextPosition() {
+
+    if (nanoStart == 0) {
+      nanoStart = System.nanoTime();
+    }
+
+    if (rowCursor.hasNext()) {
+      Object[] columns = readSupport.readRow(rowCursor.next());
+      fields = new ArrayList<String>();
+      if(columns != null && columns.length > 0)
+      {
+        for(Object value : columns){
+          if(value != null )
+          {
+            fields.add(value.toString());
+          } else {
+            fields.add(null);
+          }
+        }
+      }
+      totalBytes += columns.length;
+      return true;
+    }
+    return false;
+  }
+
+  @Override public boolean getBoolean(int field) {
+    checkFieldType(field, BOOLEAN);
+    return Boolean.parseBoolean(getFieldValue(field));
+  }
+
+  @Override public long getLong(int field) {
+    String timeStr = getFieldValue(field);
+    Long milliSec = 0L;
+
+    //suppose the
+    return Math.round(Double.parseDouble(getFieldValue(field)));
+  }
+
+  @Override public double getDouble(int field) {
+    checkFieldType(field, DOUBLE);
+    return Double.parseDouble(getFieldValue(field));
+  }
+
+  @Override public Slice getSlice(int field) {
+    checkFieldType(field, VARCHAR);
+    return Slices.utf8Slice(getFieldValue(field));
+  }
+
+  @Override public Object getObject(int field) {
+    return null;
+  }
+
+  @Override public boolean isNull(int field) {
+    checkArgument(field < columnHandles.size(), "Invalid field index");
+    return Strings.isNullOrEmpty(getFieldValue(field));
+  }
+
+  String getFieldValue(int field) {
+    checkState(fields != null, "Cursor has not been advanced yet");
+    return fields.get(field);
+  }
+
+  private void checkFieldType(int field, Type expected) {
+    Type actual = getType(field);
+    checkArgument(actual.equals(expected), "Expected field %s to be type %s but is %s", field,
+        expected, actual);
+  }
+
+  @Override public void close() {
+    nanoEnd = System.nanoTime();
+
+    //todo  delete cache from readSupport
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
new file mode 100755
index 0000000..d3fd7a0
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.Type;
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
+//import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.carbondata.presto.Types.checkType;
+
+public class CarbondataRecordSet implements RecordSet {
+
+  private CarbonTable carbonTable;
+  private TupleDomain<ColumnHandle> originalConstraint;
+  private Expression carbonConstraint;
+  private List<CarbondataColumnConstraint> rebuildConstraints;
+  private QueryModel queryModel;
+  private CarbondataSplit split;
+  private List<CarbondataColumnHandle> columns;
+  private QueryExecutor queryExecutor;
+
+  private CarbonReadSupport<Object[]> readSupport;
+
+  public CarbondataRecordSet(CarbonTable carbonTable, ConnectorSession session,
+      ConnectorSplit split, List<CarbondataColumnHandle> columns, QueryModel queryModel) {
+    this.carbonTable = carbonTable;
+    this.split = checkType(split, CarbondataSplit.class, "connectorSplit");
+    this.originalConstraint = this.split.getConstraints();
+    this.rebuildConstraints = this.split.getRebuildConstraints();
+    this.queryModel = queryModel;
+    this.columns = columns;
+    this.readSupport = new DictionaryDecodeReadSupport();
+  }
+
+  //todo support later
+  private Expression parseConstraint2Expression(TupleDomain<ColumnHandle> constraints) {
+    return null;
+  }
+
+  @Override public List<Type> getColumnTypes() {
+    return columns.stream().map(a -> a.getColumnType()).collect(Collectors.toList());
+  }
+
+  /**
+   * get data blocks via Carbondata QueryModel API
+   */
+  @Override public RecordCursor cursor() {
+    List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
+
+    tableBlockInfoList.add(new TableBlockInfo(split.getLocalInputSplit().getPath().toString(),
+        split.getLocalInputSplit().getStart(), split.getLocalInputSplit().getSegmentId(),
+        split.getLocalInputSplit().getLocations().toArray(new String[0]),
+        split.getLocalInputSplit().getLength(),
+        //blockletInfos,
+        ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion())));
+    queryModel.setTableBlockInfos(tableBlockInfoList);
+
+    queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+
+    //queryModel.setQueryId(queryModel.getQueryId() + "_" + split.getLocalInputSplit().getSegmentId());
+    try {
+      readSupport
+          .initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
+      CarbonIterator<Object[]> carbonIterator =
+          new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
+      RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns, split);
+      return rc;
+    } catch (QueryExecutionException e) {
+      //throw new InterruptedException(e.getMessage());
+      System.out.println(e.getMessage());
+    } catch (Exception ex) {
+      System.out.println(ex.toString());
+    }
+    return null;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index 63b926f..8b087df 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -72,7 +72,6 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
     requireNonNull(split, "split is null");
     requireNonNull(columns, "columns is null");
 
-    // Convert split
     CarbondataSplit cdSplit =
         checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
     checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
@@ -111,11 +110,11 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
     fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
 
     // Return new record set
-    return new CarbondataRecordSet(targetTable,/*connector,*/ session, /*config, */cdSplit,
+    return new CarbondataRecordSet(targetTable, session, cdSplit,
         handles.build(), queryModel);
   }
 
-  // Build filter for QueryModel (copy from CarbonInputFormat=> createRecordReader)
+  // Build filter for QueryModel
   private void fillFilter2QueryModel(QueryModel queryModel,
       TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) {
 
@@ -139,14 +138,9 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
       checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
 
       if (domain.getValues().isNone()) {
-        //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
-        //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
-        //new Expression()
       }
 
       if (domain.getValues().isAll()) {
-        //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
-        //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
       }
 
       List<Object> singleValues = new ArrayList<>();
@@ -166,7 +160,6 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
                 } else {
                   GreaterThanExpression greater = new GreaterThanExpression(colExpression,
                       new LiteralExpression(value, coltype));
-                  //greater.setRangeExpression(true);
                   rangeFilter.add(greater);
                 }
                 break;
@@ -174,7 +167,6 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
                 GreaterThanEqualToExpression greater =
                     new GreaterThanEqualToExpression(colExpression,
                         new LiteralExpression(value, coltype));
-                //greater.setRangeExpression(true);
                 rangeFilter.add(greater);
                 break;
               case BELOW:
@@ -191,13 +183,11 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
               case EXACTLY:
                 LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
                     new LiteralExpression(value, coltype));
-                //less.setRangeExpression(true);
                 rangeFilter.add(less);
                 break;
               case BELOW:
                 LessThanExpression less2 =
                     new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
-                //less2.setRangeExpression(true);
                 rangeFilter.add(less2);
                 break;
               default:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
new file mode 100755
index 0000000..f3efb36
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import org.apache.carbondata.presto.impl.CarbonLocalInputSplit;
+import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
+import org.apache.carbondata.presto.impl.CarbonTableReader;
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.*;
+import com.google.common.collect.ImmutableList;
+import io.airlift.slice.Slice;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.*;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.carbondata.presto.Types.checkType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Build Carbontable splits
+ * filtering irrelevant blocks
+ */
+public class CarbondataSplitManager implements ConnectorSplitManager {
+
+  private final String connectorId;
+  private final CarbonTableReader carbonTableReader;
+
+  @Inject
+  public CarbondataSplitManager(CarbondataConnectorId connectorId, CarbonTableReader reader) {
+    this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+    this.carbonTableReader = requireNonNull(reader, "client is null");
+  }
+
+  public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle,
+      ConnectorSession session, ConnectorTableLayoutHandle layout) {
+    CarbondataTableLayoutHandle layoutHandle = (CarbondataTableLayoutHandle) layout;
+    CarbondataTableHandle tableHandle = layoutHandle.getTable();
+    SchemaTableName key = tableHandle.getSchemaTableName();
+
+    // Packaging presto-TupleDomain into CarbondataColumnConstraint, to decouple from presto-spi Module
+    List<CarbondataColumnConstraint> rebuildConstraints =
+        getColumnConstraints(layoutHandle.getConstraint());
+
+    CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key);
+    Expression filters = parseFilterExpression(layoutHandle.getConstraint(), cache.carbonTable);
+
+    if (cache != null) {
+      try {
+        List<CarbonLocalInputSplit> splits = carbonTableReader.getInputSplits2(cache, filters);
+
+        ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
+        for (CarbonLocalInputSplit split : splits) {
+          cSplits.add(new CarbondataSplit(connectorId, tableHandle.getSchemaTableName(),
+              layoutHandle.getConstraint(), split, rebuildConstraints));
+        }
+        return new FixedSplitSource(cSplits.build());
+      } catch (Exception ex) {
+        System.out.println(ex.toString());
+      }
+    }
+    return null;
+  }
+
+  public List<CarbondataColumnConstraint> getColumnConstraints(
+      TupleDomain<ColumnHandle> constraint) {
+    ImmutableList.Builder<CarbondataColumnConstraint> constraintBuilder = ImmutableList.builder();
+    for (TupleDomain.ColumnDomain<ColumnHandle> columnDomain : constraint.getColumnDomains()
+        .get()) {
+      CarbondataColumnHandle columnHandle =
+          checkType(columnDomain.getColumn(), CarbondataColumnHandle.class, "column handle");
+
+      constraintBuilder.add(new CarbondataColumnConstraint(columnHandle.getColumnName(),
+          Optional.of(columnDomain.getDomain()), columnHandle.isInvertedIndex()));
+    }
+
+    return constraintBuilder.build();
+  }
+
+  /**
+   * Convert presto-TupleDomain predication into Carbon scan express condition
+   * @param originalConstraint  presto-TupleDomain
+   * @param carbonTable
+   * @return
+   */
+  public Expression parseFilterExpression(TupleDomain<ColumnHandle> originalConstraint,
+      CarbonTable carbonTable) {
+    ImmutableList.Builder<Expression> filters = ImmutableList.builder();
+
+    Domain domain = null;
+
+    for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+
+      CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
+      Type type = cdch.getColumnType();
+
+      List<CarbonColumn> ccols = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName());
+      Optional<CarbonColumn> target =
+          ccols.stream().filter(a -> a.getColName().equals(cdch.getColumnName())).findFirst();
+
+      if (target.get() == null) return null;
+
+      DataType coltype = target.get().getDataType();
+      ColumnExpression colExpression =
+          new ColumnExpression(cdch.getColumnName(), target.get().getDataType());
+      //colExpression.setColIndex(cs.getSchemaOrdinal());
+      colExpression.setDimension(target.get().isDimesion());
+      colExpression.setDimension(
+          carbonTable.getDimensionByName(carbonTable.getFactTableName(), cdch.getColumnName()));
+      colExpression.setCarbonColumn(target.get());
+
+      domain = originalConstraint.getDomains().get().get(c);
+      checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+      if (domain.getValues().isNone()) {
+      }
+
+      if (domain.getValues().isAll()) {
+      }
+
+      List<Object> singleValues = new ArrayList<>();
+      List<Expression> rangeFilter = new ArrayList<>();
+      for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
+        checkState(!range.isAll()); // Already checked
+        if (range.isSingleValue()) {
+          singleValues.add(range.getLow().getValue());
+        } else {
+          List<String> rangeConjuncts = new ArrayList<>();
+          if (!range.getLow().isLowerUnbounded()) {
+            Object value = ConvertDataByType(range.getLow().getValue(), type);
+            switch (range.getLow().getBound()) {
+              case ABOVE:
+                if (type == TimestampType.TIMESTAMP) {
+                  //todo not now
+                } else {
+                  GreaterThanExpression greater = new GreaterThanExpression(colExpression,
+                      new LiteralExpression(value, coltype));
+                  rangeFilter.add(greater);
+                }
+                break;
+              case EXACTLY:
+                GreaterThanEqualToExpression greater =
+                    new GreaterThanEqualToExpression(colExpression,
+                        new LiteralExpression(value, coltype));
+                rangeFilter.add(greater);
+                break;
+              case BELOW:
+                throw new IllegalArgumentException("Low marker should never use BELOW bound");
+              default:
+                throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
+            }
+          }
+          if (!range.getHigh().isUpperUnbounded()) {
+            Object value = ConvertDataByType(range.getHigh().getValue(), type);
+            switch (range.getHigh().getBound()) {
+              case ABOVE:
+                throw new IllegalArgumentException("High marker should never use ABOVE bound");
+              case EXACTLY:
+                LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
+                    new LiteralExpression(value, coltype));
+                rangeFilter.add(less);
+                break;
+              case BELOW:
+                LessThanExpression less2 =
+                    new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
+                rangeFilter.add(less2);
+                break;
+              default:
+                throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
+            }
+          }
+        }
+      }
+
+      if (singleValues.size() == 1) {
+        Expression ex = null;
+        if (coltype.equals(DataType.STRING)) {
+          ex = new EqualToExpression(colExpression,
+              new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+        } else ex = new EqualToExpression(colExpression,
+            new LiteralExpression(singleValues.get(0), coltype));
+        filters.add(ex);
+      } else if (singleValues.size() > 1) {
+        ListExpression candidates = null;
+        List<Expression> exs = singleValues.stream().map((a) -> {
+          return new LiteralExpression(ConvertDataByType(a, type), coltype);
+        }).collect(Collectors.toList());
+        candidates = new ListExpression(exs);
+
+        if (candidates != null) filters.add(new InExpression(colExpression, candidates));
+      } else if (rangeFilter.size() > 0) {
+        if (rangeFilter.size() > 1) {
+          Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
+          if (rangeFilter.size() > 2) {
+            for (int i = 2; i < rangeFilter.size(); i++) {
+              filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
+            }
+          }
+        } else if (rangeFilter.size() == 1)//only have one value
+          filters.add(rangeFilter.get(0));
+      }
+    }
+
+    Expression finalFilters;
+    List<Expression> tmp = filters.build();
+    if (tmp.size() > 1) {
+      finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
+      if (tmp.size() > 2) {
+        for (int i = 2; i < tmp.size(); i++) {
+          finalFilters = new AndExpression(finalFilters, tmp.get(i));
+        }
+      }
+    } else if (tmp.size() == 1) finalFilters = tmp.get(0);
+    else//no filter
+      return null;
+
+    return finalFilters;
+  }
+
+  /**
+   * Convert presto spi Type into Carbondata Type
+   * @param colType
+   * @return
+   */
+  public static DataType Spi2CarbondataTypeMapper(Type colType) {
+    if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
+    else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
+    else if (colType == IntegerType.INTEGER) return DataType.INT;
+    else if (colType == BigintType.BIGINT) return DataType.LONG;
+    else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
+    else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
+    else if (colType == VarcharType.VARCHAR) return DataType.STRING;
+    else if (colType == DateType.DATE) return DataType.DATE;
+    else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+    else return DataType.STRING;
+  }
+
+  public Object ConvertDataByType(Object rawdata, Type type) {
+    if (type.equals(IntegerType.INTEGER)) return new Integer((rawdata.toString()));
+    else if (type.equals(BigintType.BIGINT)) return (Long) rawdata;
+    else if (type.equals(VarcharType.VARCHAR)) return ((Slice) rawdata).toStringUtf8();
+    else if (type.equals(BooleanType.BOOLEAN)) return (Boolean) (rawdata);
+
+    return rawdata;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
new file mode 100755
index 0000000..45755d1
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * Caching Carbon meta(e.g. TableIdentifier, TablePath, TableInfo, CarbonTable) in Class CarbonTableReader
+ * to speed up query
+ */
+public class CarbonTableCacheModel {
+
+  public CarbonTableIdentifier carbonTableIdentifier;
+  public CarbonTablePath carbonTablePath;
+
+  public TableInfo tableInfo;
+  public CarbonTable carbonTable;
+  public String[] segments;
+
+  public boolean isValid() {
+    if (carbonTable != null && carbonTablePath != null && carbonTableIdentifier != null)
+      return true;
+    else return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
index a0ef63f..677cefd 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
@@ -21,6 +21,9 @@ import io.airlift.configuration.Config;
 
 import javax.validation.constraints.NotNull;
 
+/**
+ * Configuration read from etc/catalog/carbondata.properties
+ */
 public class CarbonTableConfig {
 
   //read from config

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 14ecfbc..bb482b0 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -71,22 +71,21 @@ import java.util.stream.Stream;
 
 import static java.util.Objects.requireNonNull;
 
+/** CarbonTableReader will be a facade of these utils
+ *
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ */
 public class CarbonTableReader {
 
-  /** CarbonTableReader will be a facade of these utils
-   *
-   * 1:CarbonMetadata,(logic table)
-   * 2:FileFactory, (physic table file)
-   * 3:CarbonCommonFactory, (offer some )
-   * 4:DictionaryFactory, (parse dictionary util)
-   */
-
   private CarbonTableConfig config;
   private List<SchemaTableName> tableList;
   private CarbonFile dbStore;
   private FileFactory.FileType fileType;
 
-  //as a cache for Carbon reader
+  // A cache for Carbon reader
   private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;
 
   @Inject public CarbonTableReader(CarbonTableConfig config) {
@@ -94,7 +93,7 @@ public class CarbonTableReader {
     this.cc = new ConcurrentHashMap<>();
   }
 
-  //for worker node to initialize carbon metastore
+  // for worker node to initialize carbon metastore
   public CarbonTableCacheModel getCarbonCache(SchemaTableName table) {
     if (!cc.containsKey(table)) {
       try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
@@ -120,7 +119,7 @@ public class CarbonTableReader {
     return updateSchemaList();
   }
 
-  //default PathFilter
+  // default PathFilter
   private static final PathFilter DefaultFilter = new PathFilter() {
     @Override public boolean accept(Path path) {
       return CarbonTablePath.isCarbonDataFile(path.getName());
@@ -177,7 +176,7 @@ public class CarbonTableReader {
   }
 
   public void updateSchemaTables() {
-    //update logic determine later
+    // update logic determine later
     if (dbStore == null) {
       updateSchemaList();
     }
@@ -232,14 +231,14 @@ public class CarbonTableReader {
           (org.apache.carbondata.format.TableInfo) thriftReader.read();
       thriftReader.close();
 
-      //Format Level TableInfo, need transfer to Code Level TableInfo
+      // Step3: Transform Format Level TableInfo to Code Level TableInfo
       SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
       TableInfo wrapperTableInfo = schemaConverter
           .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
               storePath);
       wrapperTableInfo.setMetaDataFilepath(
           CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
-      //load metadata info into CarbonMetadata
+      // Step4: Load metadata info into CarbonMetadata
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
 
       cache.tableInfo = wrapperTableInfo;
@@ -256,7 +255,7 @@ public class CarbonTableReader {
   public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
       Expression filters) throws Exception {
 
-    //处理filter, 下推filter,将应用在Segment的索引上
+    // need apply filters to segment
     FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
 
     AbsoluteTableIdentifier absoluteTableIdentifier =
@@ -297,7 +296,7 @@ public class CarbonTableReader {
         .resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier());
 
     List<CarbonLocalInputSplit> result = new ArrayList<>();
-    //for each segment fetch blocks matching filter in Driver BTree
+    // for each segment fetch blocks matching filter in Driver BTree
     for (String segmentNo : tableCacheModel.segments) {
       try {
         List<DataRefNode> dataRefNodes =
@@ -338,7 +337,7 @@ public class CarbonTableReader {
     //DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance();
     //QueryStatistic statistic = new QueryStatistic();
 
-    //读取Segment 内部的Index
+    // read segment index
     Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
         getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient,
             updateStatusManager);
@@ -349,13 +348,10 @@ public class CarbonTableReader {
       // build result
       for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
         List<DataRefNode> filterredBlocks;
-        // if no filter is given get all blocks from Btree Index
+        // if no filter is given, get all blocks from Btree Index
         if (null == resolver) {
           filterredBlocks = getDataBlocksOfIndex(abstractIndex);
         } else {
-          //ignore filter
-          //filterredBlocks = getDataBlocksOfIndex(abstractIndex);
-
           // apply filter and get matching blocks
           filterredBlocks = filterExpressionProcessor
               .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex,
@@ -568,12 +564,10 @@ public class CarbonTableReader {
         try {
           Configuration conf = new Configuration();
           fs = segmentPath.getFileSystem(conf);
-          //fs.initialize(segmentPath.toUri(), conf);
 
           RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath);
           while (iter.hasNext()) {
             LocatedFileStatus stat = iter.next();
-            //if(stat.getPath().toString().contains("carbondata"))//参看carbondata的carbonInputFilter的实现
             if (DefaultFilter.accept(stat.getPath())) {
               if (stat.isDirectory()) {
                 addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);