You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/05/24 10:49:04 UTC
[1/5] carbondata git commit: resolved rebase conflict for presto
branch
Repository: carbondata
Updated Branches:
refs/heads/master b10cf4f1c -> 5f6f1a5c4
resolved rebase conflict for presto branch
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a60775ea
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a60775ea
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a60775ea
Branch: refs/heads/master
Commit: a60775ea9b5949f46c620cd90443c33fcaf021ee
Parents: b10cf4f
Author: chenliang613 <ch...@apache.org>
Authored: Wed May 24 18:34:32 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Wed May 24 18:34:32 2017 +0800
----------------------------------------------------------------------
.../presto/CarbondataColumnHandle.java | 141 ++++
.../carbondata/presto/CarbondataMetadata.java | 271 +++++++
.../presto/impl/CarbonLocalInputSplit.java | 83 +++
.../presto/impl/CarbonTableConfig.java | 57 ++
.../presto/impl/CarbonTableReader.java | 723 +++++++++++++++++++
5 files changed, 1275 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a60775ea/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
new file mode 100755
index 0000000..cc10165
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.type.Type;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+//import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Objects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataColumnHandle implements ColumnHandle {
+ private final String connectorId;
+ private final String columnName;
+
+ public boolean isInvertedIndex() {
+ return isInvertedIndex;
+ }
+
+ private final Type columnType;
+ private final int ordinalPosition;
+ private final int keyOrdinal;
+ private final int columnGroupOrdinal;
+
+ private final int columnGroupId;
+ private final String columnUniqueId;
+ private final boolean isInvertedIndex;
+
+ public boolean isMeasure() {
+ return isMeasure;
+ }
+
+ private final boolean isMeasure;
+
+ public int getKeyOrdinal() {
+ return keyOrdinal;
+ }
+
+ public int getColumnGroupOrdinal() {
+ return columnGroupOrdinal;
+ }
+
+ public int getColumnGroupId() {
+ return columnGroupId;
+ }
+
+ public String getColumnUniqueId() {
+ return columnUniqueId;
+ }
+
+ /**
+ * ordinalPosition of a columnhandle is the -> number of the column in the entire list of columns of this table
+ * IT DOESNT DEPEND ON THE QUERY (select clm3, clm0, clm1 from tablename)
+ * The columnhandle of clm3 : has ordinalposition = 3
+ */
+
+ @JsonCreator public CarbondataColumnHandle(@JsonProperty("connectorId") String connectorId,
+ @JsonProperty("columnName") String columnName, @JsonProperty("columnType") Type columnType,
+ @JsonProperty("ordinalPosition") int ordinalPosition,
+ @JsonProperty("keyOrdinal") int keyOrdinal,
+ @JsonProperty("columnGroupOrdinal") int columnGroupOrdinal,
+ @JsonProperty("isMeasure") boolean isMeasure,
+ @JsonProperty("columnGroupId") int columnGroupId,
+ @JsonProperty("columnUniqueId") String columnUniqueId,
+ @JsonProperty("isInvertedIndex") boolean isInvertedIndex) {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null");
+ this.columnName = requireNonNull(columnName, "columnName is null");
+ this.columnType = requireNonNull(columnType, "columnType is null");
+
+ this.ordinalPosition = requireNonNull(ordinalPosition, "ordinalPosition is null");
+ this.keyOrdinal = requireNonNull(keyOrdinal, "keyOrdinal is null");
+ this.columnGroupOrdinal = requireNonNull(columnGroupOrdinal, "columnGroupOrdinal is null");
+
+ this.isMeasure = isMeasure;
+ this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null");
+ this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
+ this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
+ }
+
+ @JsonProperty public String getConnectorId() {
+ return connectorId;
+ }
+
+ @JsonProperty public String getColumnName() {
+ return columnName;
+ }
+
+ @JsonProperty public Type getColumnType() {
+ return columnType;
+ }
+
+ @JsonProperty public int getOrdinalPosition() {
+ return ordinalPosition;
+ }
+
+ public ColumnMetadata getColumnMetadata() {
+ return new ColumnMetadata(columnName, columnType, null, false);
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(connectorId, columnName);
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+
+ CarbondataColumnHandle other = (CarbondataColumnHandle) obj;
+ return Objects.equals(this.connectorId, other.connectorId) && Objects
+ .equals(this.columnName, other.columnName);
+ }
+
+ @Override public String toString() {
+ return toStringHelper(this).add("connectorId", connectorId).add("columnName", columnName)
+ .add("columnType", columnType).add("ordinalPosition", ordinalPosition).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a60775ea/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
new file mode 100755
index 0000000..d2c5ab6
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.presto.impl.CarbonTableReader;
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.type.*;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+
+import javax.inject.Inject;
+import java.util.*;
+
+import static org.apache.carbondata.presto.Types.checkType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataMetadata implements ConnectorMetadata {
+ private final String connectorId;
+ private CarbonTableReader carbonTableReader;
+ private ClassLoader classLoader;
+
+ private Map<String, ColumnHandle> columnHandleMap;
+
+ @Inject public CarbondataMetadata(CarbondataConnectorId connectorId, CarbonTableReader reader) {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+ this.carbonTableReader = requireNonNull(reader, "client is null");
+ }
+
+ public void putClassLoader(ClassLoader classLoader) {
+ this.classLoader = classLoader;
+ }
+
+ @Override public List<String> listSchemaNames(ConnectorSession session) {
+ return listSchemaNamesInternal();
+ }
+
+ public List<String> listSchemaNamesInternal() {
+ List<String> ret;
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+ ret = carbonTableReader.getSchemaNames();
+ }
+ return ret;
+ }
+
+ @Override
+ public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) {
+
+ /*List<SchemaTableName> all = carbonTableReader.getTableList();
+ if(schemaNameOrNull != null)
+ {
+ return all.stream().filter(a -> schemaNameOrNull.equals(a.getSchemaName())).collect(Collectors.toList());
+ }
+ return all;*/
+
+ List<String> schemaNames;
+ if (schemaNameOrNull != null) {
+ schemaNames = ImmutableList.of(schemaNameOrNull);
+ } else {
+ schemaNames = carbonTableReader.getSchemaNames();
+ }
+
+ ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
+ for (String schemaName : schemaNames) {
+ for (String tableName : carbonTableReader.getTableNames(schemaName)) {
+ builder.add(new SchemaTableName(schemaName, tableName));
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session,
+ SchemaTablePrefix prefix) {
+ requireNonNull(prefix, "SchemaTablePrefix is null");
+
+ ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
+ for (SchemaTableName tableName : listTables(session, prefix)) {
+ ConnectorTableMetadata tableMetadata = getTableMetadata(tableName);
+ if (tableMetadata != null) {
+ columns.put(tableName, tableMetadata.getColumns());
+ }
+ }
+ return columns.build();
+ }
+
+ //if prefix is null. return all tables
+ //if prefix is not null, just return this table
+ private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix) {
+ if (prefix.getSchemaName() == null) {
+ return listTables(session, prefix.getSchemaName());
+ }
+ return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
+ }
+
+ private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) {
+ if (!listSchemaNamesInternal().contains(tableName.getSchemaName())) {
+ return null;
+ }
+
+ CarbonTable cb = carbonTableReader.getTable(tableName);
+ if (cb == null) {
+ return null;
+ }
+
+ List<ColumnMetadata> spiCols = new LinkedList<>();
+ List<CarbonColumn> carbonColumns = cb.getCreateOrderColumn(tableName.getTableName());
+ for (CarbonColumn col : carbonColumns) {
+ //show columns command will return these data
+ Type spiType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
+ ColumnMetadata spiCol = new ColumnMetadata(col.getColumnSchema().getColumnName(), spiType);
+ spiCols.add(spiCol);
+ }
+
+ //carbondata connector's table metadata
+ return new ConnectorTableMetadata(tableName, spiCols);
+ }
+
+ @Override public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session,
+ ConnectorTableHandle tableHandle) {
+
+ CarbondataTableHandle handle =
+ checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
+ checkArgument(handle.getConnectorId().equals(connectorId),
+ "tableHandle is not for this connector");
+
+ String schemaName = handle.getSchemaTableName().getSchemaName();
+ if (!listSchemaNamesInternal().contains(schemaName)) {
+ throw new SchemaNotFoundException(schemaName);
+ }
+
+ //CarbonTable(official struct) is stored in CarbonMetadata(official struct)
+ CarbonTable cb = carbonTableReader.getTable(handle.getSchemaTableName());
+ if (cb == null) {
+ throw new TableNotFoundException(handle.getSchemaTableName());
+ }
+
+ ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
+ String tableName = handle.getSchemaTableName().getTableName();
+ for (CarbonDimension column : cb.getDimensionByTableName(tableName)) {
+ ColumnSchema cs = column.getColumnSchema();
+
+ int complex = column.getComplexTypeOrdinal();
+ column.getNumberOfChild();
+ column.getListOfChildDimensions();
+
+ Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+ columnHandles.put(cs.getColumnName(),
+ new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, column.getSchemaOrdinal(),
+ column.getKeyOrdinal(), column.getColumnGroupOrdinal(), false, cs.getColumnGroupId(),
+ cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+ }
+
+ for (CarbonMeasure measure : cb.getMeasureByTableName(tableName)) {
+ ColumnSchema cs = measure.getColumnSchema();
+
+ Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+ columnHandles.put(cs.getColumnName(),
+ new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, cs.getSchemaOrdinal(),
+ measure.getOrdinal(), cs.getColumnGroupId(), true, cs.getColumnGroupId(),
+ cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+ }
+
+ //should i cache it?
+ columnHandleMap = columnHandles.build();
+
+ return columnHandleMap;
+ }
+
+ @Override public ColumnMetadata getColumnMetadata(ConnectorSession session,
+ ConnectorTableHandle tableHandle, ColumnHandle columnHandle) {
+
+ checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
+ return checkType(columnHandle, CarbondataColumnHandle.class, "columnHandle")
+ .getColumnMetadata();
+ }
+
+ @Override
+ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
+ //check tablename is valid
+ //schema is exist
+ //tables is exist
+
+ //CarbondataTable get from jar
+ return new CarbondataTableHandle(connectorId, tableName);
+ }
+
+ @Override public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session,
+ ConnectorTableHandle table, Constraint<ColumnHandle> constraint,
+ Optional<Set<ColumnHandle>> desiredColumns) {
+ CarbondataTableHandle handle = checkType(table, CarbondataTableHandle.class, "table");
+ ConnectorTableLayout layout = new ConnectorTableLayout(
+ new CarbondataTableLayoutHandle(handle, constraint.getSummary()/*, constraint.getPredicateMap(),constraint.getFilterTuples()*/));
+ return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
+ }
+
+ @Override public ConnectorTableLayout getTableLayout(ConnectorSession session,
+ ConnectorTableLayoutHandle handle) {
+ return new ConnectorTableLayout(handle);
+ }
+
+ @Override public ConnectorTableMetadata getTableMetadata(ConnectorSession session,
+ ConnectorTableHandle table) {
+ return getTableMetadataInternal(table);
+ }
+
+ public ConnectorTableMetadata getTableMetadataInternal(ConnectorTableHandle table) {
+ CarbondataTableHandle carbondataTableHandle =
+ checkType(table, CarbondataTableHandle.class, "table");
+ checkArgument(carbondataTableHandle.getConnectorId().equals(connectorId),
+ "tableHandle is not for this connector");
+ return getTableMetadata(carbondataTableHandle.getSchemaTableName());
+ }
+
+ public static Type CarbondataType2SpiMapper(DataType colType) {
+ switch (colType) {
+ case BOOLEAN:
+ return BooleanType.BOOLEAN;
+ case SHORT:
+ return SmallintType.SMALLINT;
+ case INT:
+ return IntegerType.INTEGER;
+ case LONG:
+ return BigintType.BIGINT;
+ case FLOAT:
+ case DOUBLE:
+ return DoubleType.DOUBLE;
+
+ case DECIMAL:
+ return DecimalType.createDecimalType();
+ case STRING:
+ return VarcharType.VARCHAR;
+ case DATE:
+ return DateType.DATE;
+ case TIMESTAMP:
+ return TimestampType.TIMESTAMP;
+
+ /*case DataType.MAP:
+ case DataType.ARRAY:
+ case DataType.STRUCT:
+ case DataType.NULL:*/
+
+ default:
+ return VarcharType.VARCHAR;
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a60775ea/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
new file mode 100755
index 0000000..ba8d9b5
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonLocalInputSplit.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class CarbonLocalInputSplit {
+
+ private static final long serialVersionUID = 3520344046772190207L;
+ private String segmentId;
+ private String path;
+ private long start;
+ private long length;
+ private List<String> locations;
+ private short version;
+
+ /**
+ * Number of BlockLets in a block
+ */
+ private int numberOfBlocklets = 0;
+
+ @JsonProperty public short getVersion() {
+ return version;
+ }
+
+ @JsonProperty public List<String> getLocations() {
+ return locations;
+ }
+
+ @JsonProperty public long getLength() {
+ return length;
+ }
+
+ @JsonProperty public long getStart() {
+ return start;
+ }
+
+ @JsonProperty public String getPath() {
+ return path;
+ }
+
+ @JsonProperty public String getSegmentId() {
+ return segmentId;
+ }
+
+ @JsonProperty public int getNumberOfBlocklets() {
+ return numberOfBlocklets;
+ }
+
+ @JsonCreator public CarbonLocalInputSplit(@JsonProperty("segmentId") String segmentId,
+ @JsonProperty("path") String path, @JsonProperty("start") long start,
+ @JsonProperty("length") long length, @JsonProperty("locations") List<String> locations,
+ @JsonProperty("numberOfBlocklets") int numberOfBlocklets/*,
+ @JsonProperty("tableBlockInfo") TableBlockInfo tableBlockInfo*/,
+ @JsonProperty("version") short version) {
+ this.path = path;
+ this.start = start;
+ this.length = length;
+ this.segmentId = segmentId;
+ this.locations = locations;
+ this.numberOfBlocklets = numberOfBlocklets;
+ //this.tableBlockInfo = tableBlockInfo;
+ this.version = version;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a60775ea/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
new file mode 100755
index 0000000..a0ef63f
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import io.airlift.configuration.Config;
+
+import javax.validation.constraints.NotNull;
+
+public class CarbonTableConfig {
+
+ //read from config
+ private String dbPath;
+ private String tablePath;
+ private String storePath;
+
+ @NotNull public String getDbPath() {
+ return dbPath;
+ }
+
+ @Config("carbondata-store") public CarbonTableConfig setDbPath(String dbPath) {
+ this.dbPath = dbPath;
+ return this;
+ }
+
+ @NotNull public String getTablePath() {
+ return tablePath;
+ }
+
+ @Config("carbondata-store") public CarbonTableConfig setTablePath(String tablePath) {
+ this.tablePath = tablePath;
+ return this;
+ }
+
+ @NotNull public String getStorePath() {
+ return storePath;
+ }
+
+ @Config("carbondata-store") public CarbonTableConfig setStorePath(String storePath) {
+ this.storePath = storePath;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/a60775ea/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
new file mode 100755
index 0000000..14ecfbc
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -0,0 +1,723 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.*;
+import org.apache.carbondata.core.datastore.block.*;
+import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
+import org.apache.carbondata.core.datastore.impl.btree.BlockBTreeLeafNode;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.service.impl.PathFactory;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CacheClient;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.thrift.TBase;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbonTableReader {
+
+ /** CarbonTableReader will be a facade of these utils
+ *
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ */
+
+ private CarbonTableConfig config;
+ private List<SchemaTableName> tableList;
+ private CarbonFile dbStore;
+ private FileFactory.FileType fileType;
+
+ //as a cache for Carbon reader
+ private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;
+
+ @Inject public CarbonTableReader(CarbonTableConfig config) {
+ this.config = requireNonNull(config, "CarbonTableConfig is null");
+ this.cc = new ConcurrentHashMap<>();
+ }
+
+ //for worker node to initialize carbon metastore
+ public CarbonTableCacheModel getCarbonCache(SchemaTableName table) {
+ if (!cc.containsKey(table)) {
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
+ FileFactory.class.getClassLoader())) {
+ if (dbStore == null) {
+ fileType = FileFactory.getFileType(config.getStorePath());
+ try {
+ dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ updateSchemaTables();
+ parseCarbonMetadata(table);
+ }
+
+ if (cc.containsKey(table)) return cc.get(table);
+ else return null;
+ }
+
+ public List<String> getSchemaNames() {
+ return updateSchemaList();
+ }
+
+ //default PathFilter
+ private static final PathFilter DefaultFilter = new PathFilter() {
+ @Override public boolean accept(Path path) {
+ return CarbonTablePath.isCarbonDataFile(path.getName());
+ }
+ };
+
+ public boolean updateDbStore() {
+ if (dbStore == null) {
+ fileType = FileFactory.getFileType(config.getStorePath());
+ try {
+ dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ return true;
+ }
+
+ public List<String> updateSchemaList() {
+ updateDbStore();
+
+ if (dbStore != null) {
+ List<String> scs =
+ Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
+ return scs;
+ } else return ImmutableList.of();
+ }
+
+ public Set<String> getTableNames(String schema) {
+ requireNonNull(schema, "schema is null");
+ return updateTableList(schema);
+ }
+
+ public Set<String> updateTableList(String dbName) {
+ List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName()))
+ .collect(Collectors.toList());
+ if (schema.size() > 0) {
+ return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName())
+ .collect(Collectors.toSet());
+ } else return ImmutableSet.of();
+ }
+
+ public CarbonTable getTable(SchemaTableName schemaTableName) {
+ try {
+ updateSchemaTables();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ requireNonNull(schemaTableName, "schemaTableName is null");
+ CarbonTable table = loadTableMetadata(schemaTableName);
+
+ return table;
+ }
+
+ public void updateSchemaTables() {
+ //update logic determine later
+ if (dbStore == null) {
+ updateSchemaList();
+ }
+
+ tableList = new LinkedList<>();
+ for (CarbonFile db : dbStore.listFiles()) {
+ if (!db.getName().endsWith(".mdt")) {
+ for (CarbonFile table : db.listFiles()) {
+ tableList.add(new SchemaTableName(db.getName(), table.getName()));
+ }
+ }
+ }
+ }
+
+ private CarbonTable loadTableMetadata(SchemaTableName schemaTableName) {
+ for (SchemaTableName table : tableList) {
+ if (!table.equals(schemaTableName)) continue;
+
+ return parseCarbonMetadata(table);
+ }
+ return null;
+ }
+
+ /**
+ * parse carbon metadata into cc(CarbonTableReader cache)
+ */
+ public CarbonTable parseCarbonMetadata(SchemaTableName table) {
+ CarbonTable result = null;
+ try {
+ CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel());
+ if (cache.isValid()) return cache.carbonTable;
+
+ //Step1: get table meta path, load carbon table param
+ String storePath = config.getStorePath();
+ cache.carbonTableIdentifier =
+ new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(),
+ UUID.randomUUID().toString());
+ cache.carbonTablePath =
+ PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier);
+ cc.put(table, cache);
+
+ //Step2: check file existed? read schema file
+ ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+ public TBase create() {
+ return new org.apache.carbondata.format.TableInfo();
+ }
+ };
+ ThriftReader thriftReader =
+ new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), createTBase);
+ thriftReader.open();
+ org.apache.carbondata.format.TableInfo tableInfo =
+ (org.apache.carbondata.format.TableInfo) thriftReader.read();
+ thriftReader.close();
+
+ //Format Level TableInfo, need transfer to Code Level TableInfo
+ SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+ TableInfo wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+ storePath);
+ wrapperTableInfo.setMetaDataFilepath(
+ CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
+ //load metadata info into CarbonMetadata
+ CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+
+ cache.tableInfo = wrapperTableInfo;
+ cache.carbonTable = CarbonMetadata.getInstance()
+ .getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName());
+ result = cache.carbonTable;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+
+ return result;
+ }
+
+ public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
+ Expression filters) throws Exception {
+
+ //处理filter, 下推filter,将应用在Segment的索引上
+ FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
+
+ AbsoluteTableIdentifier absoluteTableIdentifier =
+ tableCacheModel.carbonTable.getAbsoluteTableIdentifier();
+ CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath());
+ List<String> invalidSegments = new ArrayList<>();
+ List<UpdateVO> invalidTimestampsList = new ArrayList<>();
+
+ // get all valid segments and set them into the configuration
+ SegmentUpdateStatusManager updateStatusManager =
+ new SegmentUpdateStatusManager(absoluteTableIdentifier);
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+ SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
+ segmentStatusManager.getValidAndInvalidSegments();
+
+ tableCacheModel.segments = segments.getValidSegments().toArray(new String[0]);
+ if (segments.getValidSegments().size() == 0) {
+ return new ArrayList<>(0);
+ }
+
+ // remove entry in the segment index if there are invalid segments
+ invalidSegments.addAll(segments.getInvalidSegments());
+ for (String invalidSegmentId : invalidSegments) {
+ invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
+ }
+ if (invalidSegments.size() > 0) {
+ List<TableSegmentUniqueIdentifier> invalidSegmentsIds =
+ new ArrayList<>(invalidSegments.size());
+ for (String segId : invalidSegments) {
+ invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segId));
+ }
+ cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
+ }
+
+ // get filter for segment
+ CarbonInputFormatUtil.processFilterExpression(filters, tableCacheModel.carbonTable);
+ FilterResolverIntf filterInterface = CarbonInputFormatUtil
+ .resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier());
+
+ List<CarbonLocalInputSplit> result = new ArrayList<>();
+ //for each segment fetch blocks matching filter in Driver BTree
+ for (String segmentNo : tableCacheModel.segments) {
+ try {
+ List<DataRefNode> dataRefNodes =
+ getDataBlocksOfSegment(filterExpressionProcessor, absoluteTableIdentifier,
+ tableCacheModel.carbonTablePath, filterInterface, segmentNo, cacheClient,
+ updateStatusManager);
+ for (DataRefNode dataRefNode : dataRefNodes) {
+ BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
+ TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
+
+ if (CarbonUtil.isInvalidTableBlock(tableBlockInfo,
+ updateStatusManager.getInvalidTimestampRange(tableBlockInfo.getSegmentId()),
+ updateStatusManager)) {
+ continue;
+ }
+ result.add(new CarbonLocalInputSplit(segmentNo, tableBlockInfo.getFilePath(),
+ tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
+ Arrays.asList(tableBlockInfo.getLocations()),
+ tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
+ tableBlockInfo.getVersion().number()));
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ cacheClient.close();
+ return result;
+ }
+
+ /**
+ * get data blocks of given segment
+ */
+ private List<DataRefNode> getDataBlocksOfSegment(
+ FilterExpressionProcessor filterExpressionProcessor,
+ AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath,
+ FilterResolverIntf resolver, String segmentId, CacheClient cacheClient,
+ SegmentUpdateStatusManager updateStatusManager) throws IOException {
+ //DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance();
+ //QueryStatistic statistic = new QueryStatistic();
+
+ //读取Segment 内部的Index
+ Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
+ getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient,
+ updateStatusManager);
+
+ List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
+
+ if (null != segmentIndexMap) {
+ // build result
+ for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
+ List<DataRefNode> filterredBlocks;
+ // if no filter is given get all blocks from Btree Index
+ if (null == resolver) {
+ filterredBlocks = getDataBlocksOfIndex(abstractIndex);
+ } else {
+ //ignore filter
+ //filterredBlocks = getDataBlocksOfIndex(abstractIndex);
+
+ // apply filter and get matching blocks
+ filterredBlocks = filterExpressionProcessor
+ .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex,
+ absoluteTableIdentifier);
+ }
+ resultFilterredBlocks.addAll(filterredBlocks);
+ }
+ }
+ //statistic.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+ //recorder.recordStatisticsForDriver(statistic, "123456"/*job.getConfiguration().get("query.id")*/);
+ return resultFilterredBlocks;
+ }
+
+ private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper,
+ UpdateVO updateDetails) {
+ if (null != updateDetails.getLatestUpdateTimestamp()
+ && updateDetails.getLatestUpdateTimestamp() > segmentTaskIndexWrapper
+ .getRefreshedTimeStamp()) {
+ return true;
+ }
+ return false;
+ }
+
+ private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(/*JobContext job,*/
+ AbsoluteTableIdentifier absoluteTableIdentifier, CarbonTablePath tablePath, String segmentId,
+ CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws IOException {
+ Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
+ SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
+ boolean isSegmentUpdated = false;
+ Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
+ TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
+ new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
+ segmentTaskIndexWrapper =
+ cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
+ UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
+ if (null != segmentTaskIndexWrapper) {
+ segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+ if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
+ taskKeys = segmentIndexMap.keySet();
+ isSegmentUpdated = true;
+ }
+ }
+
+ // if segment tree is not loaded, load the segment tree
+ if (segmentIndexMap == null || isSegmentUpdated) {
+
+ List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
+ List<String> segs = new ArrayList<>();
+ segs.add(segmentId);
+
+ FileSystem fs =
+ getFileStatusOfSegments(new String[] { segmentId }, tablePath, fileStatusList);
+ List<InputSplit> splits = getSplit(fileStatusList, fs);
+
+ List<FileSplit> carbonSplits = new ArrayList<>();
+ for (InputSplit inputSplit : splits) {
+ FileSplit fileSplit = (FileSplit) inputSplit;
+ String segId = CarbonTablePath.DataPathUtil
+ .getSegmentId(fileSplit.getPath().toString());//这里的seperator应该怎么加??
+ if (segId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
+ continue;
+ }
+ carbonSplits.add(fileSplit);
+ }
+
+ List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
+ for (FileSplit inputSplit : carbonSplits) {
+ if (isValidBlockBasedOnUpdateDetails(taskKeys, inputSplit, updateDetails,
+ updateStatusManager, segmentId)) {
+
+ BlockletInfos blockletInfos = new BlockletInfos(0, 0,
+ 0);//this level we do not need blocklet info!!!! Is this a trick?
+ tableBlockInfoList.add(
+ new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(), segmentId,
+ inputSplit.getLocations(), inputSplit.getLength(), blockletInfos,
+ ColumnarFormatVersion
+ .valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION), null/*new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)*/));//这里的null是否会异常?
+ }
+ }
+
+ Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
+ segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
+ // get Btree blocks for given segment
+ tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
+ tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
+ segmentTaskIndexWrapper =
+ cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
+ segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+ }
+ return segmentIndexMap;
+ }
+
+ private boolean isValidBlockBasedOnUpdateDetails(
+ Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, FileSplit carbonInputSplit,
+ UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId) {
+ String taskID = null;
+ if (null != carbonInputSplit) {
+ if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) {
+ return false;
+ }
+
+ if (null == taskKeys) {
+ return true;
+ }
+
+ taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName());
+ String bucketNo =
+ CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName());
+
+ SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder =
+ new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo);
+
+ String blockTimestamp = carbonInputSplit.getPath().getName()
+ .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1,
+ carbonInputSplit.getPath().getName().lastIndexOf('.'));
+ if (!(updateDetails.getUpdateDeltaStartTimestamp() != null
+ && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) {
+ if (!taskKeys.contains(taskBucketHolder)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private List<InputSplit> getSplit(List<FileStatus> fileStatusList, FileSystem targetSystem)
+ throws IOException {
+
+ Iterator split = fileStatusList.iterator();
+
+ List<InputSplit> splits = new ArrayList<>();
+
+ while (true) {
+ while (true) {
+ while (split.hasNext()) {
+ FileStatus file = (FileStatus) split.next();
+ Path path = file.getPath();
+ long length = file.getLen();
+ if (length != 0L) {
+ BlockLocation[] blkLocations;
+ if (file instanceof LocatedFileStatus) {
+ blkLocations = ((LocatedFileStatus) file).getBlockLocations();
+ } else {
+ blkLocations = targetSystem.getFileBlockLocations(file, 0L, length);
+ }
+
+ if (this.isSplitable()) {
+ long blockSize1 = file.getBlockSize();
+ long splitSize = this.computeSplitSize(blockSize1, 1, Long.MAX_VALUE);
+
+ long bytesRemaining;
+ int blkIndex;
+ for (
+ bytesRemaining = length;
+ (double) bytesRemaining / (double) splitSize > 1.1D;
+ bytesRemaining -= splitSize) {
+ blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
+ splits.add(this.makeSplit(path, length - bytesRemaining, splitSize,
+ blkLocations[blkIndex].getHosts()));
+ }
+
+ if (bytesRemaining != 0L) {
+ blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
+ splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining,
+ blkLocations[blkIndex].getHosts()));
+ }
+ } else {
+ splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length,
+ blkLocations[0].getHosts()));
+ }
+ } else {
+ splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length,
+ new String[0]));
+ }
+ }
+ return splits;
+ }
+ }
+
+ }
+
+ private String[] getValidPartitions() {
+ //TODO: has to Identify partitions by partition pruning
+ return new String[] { "0" };
+ }
+
+ private FileSystem getFileStatusOfSegments(String[] segmentsToConsider, CarbonTablePath tablePath,
+ List<FileStatus> result) throws IOException {
+ String[] partitionsToConsider = getValidPartitions();
+ if (partitionsToConsider.length == 0) {
+ throw new IOException("No partitions/data found");
+ }
+
+ FileSystem fs = null;
+
+ //PathFilter inputFilter = getDataFileFilter(job);
+
+ // get tokens for all the required FileSystem for table path
+ /*TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { tablePath },
+ job.getConfiguration());*/
+
+ //get all data files of valid partitions and segments
+ for (int i = 0; i < partitionsToConsider.length; ++i) {
+ String partition = partitionsToConsider[i];
+
+ for (int j = 0; j < segmentsToConsider.length; ++j) {
+ String segmentId = segmentsToConsider[j];
+ Path segmentPath = new Path(tablePath.getCarbonDataDirectoryPath(partition, segmentId));
+
+ try {
+ Configuration conf = new Configuration();
+ fs = segmentPath.getFileSystem(conf);
+ //fs.initialize(segmentPath.toUri(), conf);
+
+ RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath);
+ while (iter.hasNext()) {
+ LocatedFileStatus stat = iter.next();
+ //if(stat.getPath().toString().contains("carbondata"))//参看carbondata的carbonInputFilter的实现
+ if (DefaultFilter.accept(stat.getPath())) {
+ if (stat.isDirectory()) {
+ addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);
+ } else {
+ result.add(stat);
+ }
+ }
+ }
+ } catch (Exception ex) {
+ System.out.println(ex.toString());
+ }
+ }
+ }
+ return fs;
+ }
+
+ protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path,
+ PathFilter inputFilter) throws IOException {
+ RemoteIterator iter = fs.listLocatedStatus(path);
+
+ while (iter.hasNext()) {
+ LocatedFileStatus stat = (LocatedFileStatus) iter.next();
+ if (inputFilter.accept(stat.getPath())) {
+ if (stat.isDirectory()) {
+ this.addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+ } else {
+ result.add(stat);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * get data blocks of given btree
+ */
+ private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
+ List<DataRefNode> blocks = new LinkedList<DataRefNode>();
+ SegmentProperties segmentProperties = abstractIndex.getSegmentProperties();
+
+ try {
+ IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
+ IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
+
+ // Add all blocks of btree into result
+ DataRefNodeFinder blockFinder =
+ new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize());
+ DataRefNode startBlock =
+ blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey);
+ DataRefNode endBlock =
+ blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey);
+ while (startBlock != endBlock) {
+ blocks.add(startBlock);
+ startBlock = startBlock.getNextDataRefNode();
+ }
+ blocks.add(endBlock);
+
+ } catch (KeyGenException e) {
+ System.out.println("Could not generate start key" + e.getMessage());
+ }
+ return blocks;
+ }
+
+ private boolean isSplitable() {
+ try {
+ // Don't split the file if it is local file system
+ if (this.fileType == FileFactory.FileType.LOCAL) {
+ return false;
+ }
+ } catch (Exception e) {
+ return true;
+ }
+ return true;
+ }
+
+ private long computeSplitSize(long blockSize, long minSize, long maxSize) {
+ return Math.max(minSize, Math.min(maxSize, blockSize));
+ }
+
+ private FileSplit makeSplit(Path file, long start, long length, String[] hosts) {
+ return new FileSplit(file, start, length, hosts);
+ }
+
+ private int getBlockIndex(BlockLocation[] blkLocations, long offset) {
+ for (int i = 0; i < blkLocations.length; i++) {
+ // is the offset inside this block?
+ if ((blkLocations[i].getOffset() <= offset) && (offset
+ < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
+ return i;
+ }
+ }
+ BlockLocation last = blkLocations[blkLocations.length - 1];
+ long fileLength = last.getOffset() + last.getLength() - 1;
+ throw new IllegalArgumentException("Offset " + offset +
+ " is outside of file (0.." +
+ fileLength + ")");
+ }
+
+ /**
+ * get total number of rows. for count(*)
+ *
+ * @throws IOException
+ * @throws IndexBuilderException
+ */
+ public long getRowCount() throws IOException, IndexBuilderException {
+ long rowCount = 0;
+ /*AbsoluteTableIdentifier absoluteTableIdentifier = this.carbonTable.getAbsoluteTableIdentifier();
+
+ // no of core to load the blocks in driver
+ //addSegmentsIfEmpty(job, absoluteTableIdentifier);
+ int numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
+ try {
+ numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT));
+ } catch (NumberFormatException e) {
+ numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
+ }
+ // creating a thread pool
+ ExecutorService threadPool = Executors.newFixedThreadPool(numberOfCores);
+ List<Future<Map<String, AbstractIndex>>> loadedBlocks =
+ new ArrayList<Future<Map<String, AbstractIndex>>>();
+ //for each segment fetch blocks matching filter in Driver BTree
+ for (String segmentNo : this.segmentList) {
+ // submitting the task
+ loadedBlocks
+ .add(threadPool.submit(new BlocksLoaderThread(*//*job,*//* absoluteTableIdentifier, segmentNo)));
+ }
+ threadPool.shutdown();
+ try {
+ threadPool.awaitTermination(1, TimeUnit.HOURS);
+ } catch (InterruptedException e) {
+ throw new IndexBuilderException(e);
+ }
+ try {
+ // adding all the rows of the blocks to get the total row
+ // count
+ for (Future<Map<String, AbstractIndex>> block : loadedBlocks) {
+ for (AbstractIndex abstractIndex : block.get().values()) {
+ rowCount += abstractIndex.getTotalNumberOfRows();
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IndexBuilderException(e);
+ }*/
+ return rowCount;
+ }
+}
[5/5] carbondata git commit: resolved rebase conflict for presto
conflict
Posted by ch...@apache.org.
resolved rebase conflict for presto conflict
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5f6f1a5c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5f6f1a5c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5f6f1a5c
Branch: refs/heads/master
Commit: 5f6f1a5c443face7a3abcca836cc33a805d0dba7
Parents: 0988a84
Author: chenliang613 <ch...@apache.org>
Authored: Wed May 24 18:43:01 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Wed May 24 18:43:01 2017 +0800
----------------------------------------------------------------------
.../presto/CarbondataColumnHandle.java | 25 ++-
.../carbondata/presto/CarbondataConnector.java | 83 +++++++++
.../presto/CarbondataConnectorFactory.java | 5 +-
.../carbondata/presto/CarbondataMetadata.java | 23 +--
.../carbondata/presto/CarbondataModule.java | 81 +++++++++
.../carbondata/presto/CarbondataPageSource.java | 178 +++++++++++++++++++
.../presto/CarbondataPageSourceProvider.java | 50 ++++++
.../presto/CarbondataRecordCursor.java | 60 ++++++-
.../carbondata/presto/CarbondataRecordSet.java | 8 +-
.../presto/CarbondataRecordSetProvider.java | 12 +-
10 files changed, 499 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
index b9152b5..4a9b7ed 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
@@ -46,6 +46,14 @@ public class CarbondataColumnHandle implements ColumnHandle {
private final String columnUniqueId;
private final boolean isInvertedIndex;
+ /**
+ * Used when this column contains decimal data.
+ */
+ private int scale;
+
+ private int precision;
+
+
public boolean isMeasure() {
return isMeasure;
}
@@ -76,7 +84,9 @@ public class CarbondataColumnHandle implements ColumnHandle {
@JsonProperty("isMeasure") boolean isMeasure,
@JsonProperty("columnGroupId") int columnGroupId,
@JsonProperty("columnUniqueId") String columnUniqueId,
- @JsonProperty("isInvertedIndex") boolean isInvertedIndex) {
+ @JsonProperty("isInvertedIndex") boolean isInvertedIndex,
+ @JsonProperty("precision") int precision,
+ @JsonProperty("scale") int scale) {
this.connectorId = requireNonNull(connectorId, "connectorId is null");
this.columnName = requireNonNull(columnName, "columnName is null");
this.columnType = requireNonNull(columnType, "columnType is null");
@@ -89,6 +99,8 @@ public class CarbondataColumnHandle implements ColumnHandle {
this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null");
this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
+ this.precision = precision;
+ this.scale = scale;
}
@JsonProperty public String getConnectorId() {
@@ -132,4 +144,15 @@ public class CarbondataColumnHandle implements ColumnHandle {
return toStringHelper(this).add("connectorId", connectorId).add("columnName", columnName)
.add("columnType", columnType).add("ordinalPosition", ordinalPosition).toString();
}
+
+ @JsonProperty public int getScale() {
+ return scale;
+ }
+
+ @JsonProperty public int getPrecision() {
+ return precision;
+ }
+
+
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
new file mode 100755
index 0000000..406ed93
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnector.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.transaction.IsolationLevel;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.log.Logger;
+
+import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataConnector implements Connector {
+
+ private static final Logger log = Logger.get(CarbondataConnector.class);
+
+ private final LifeCycleManager lifeCycleManager;
+ private final CarbondataMetadata metadata;
+ private final ConnectorSplitManager splitManager;
+ private final ConnectorRecordSetProvider recordSetProvider;
+ private final ClassLoader classLoader;
+ private final ConnectorPageSourceProvider pageSourceProvider;
+
+ public CarbondataConnector(LifeCycleManager lifeCycleManager, CarbondataMetadata metadata,
+ ConnectorSplitManager splitManager, ConnectorRecordSetProvider recordSetProvider,
+ ClassLoader classLoader, ConnectorPageSourceProvider pageSourceProvider) {
+ this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+ this.metadata = requireNonNull(metadata, "metadata is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+ this.classLoader = requireNonNull(classLoader, "classLoader is null");
+ this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
+ }
+
+ @Override public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel,
+ boolean readOnly) {
+ checkConnectorSupports(READ_COMMITTED, isolationLevel);
+ return CarbondataTransactionHandle.INSTANCE;
+ }
+
+ @Override public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) {
+ metadata.putClassLoader(classLoader);
+ return metadata;
+ }
+
+ @Override public ConnectorSplitManager getSplitManager() {
+ return splitManager;
+ }
+
+ @Override public ConnectorRecordSetProvider getRecordSetProvider() {
+ return recordSetProvider;
+ }
+
+ @Override
+ public ConnectorPageSourceProvider getPageSourceProvider()
+ {
+ return pageSourceProvider;
+ }
+
+ @Override public final void shutdown() {
+ try {
+ lifeCycleManager.stop();
+ } catch (Exception e) {
+ log.error(e, "Error shutting down connector");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index d1c8082..d97f19e 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.presto;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorPageSourceProvider;
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
import com.google.common.base.Throwables;
import com.google.inject.Injector;
@@ -70,10 +71,12 @@ public class CarbondataConnectorFactory implements ConnectorFactory {
ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
ConnectorRecordSetProvider connectorRecordSet =
injector.getInstance(ConnectorRecordSetProvider.class);
+ ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
return new CarbondataConnector(lifeCycleManager, metadata,
new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
- classLoader
+ classLoader,
+ new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader)
);
} catch (Exception e) {
throw Throwables.propagate(e);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index f2d594a..7701490 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -123,9 +123,8 @@ public class CarbondataMetadata implements ConnectorMetadata {
List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(schemaTableName.getTableName());
for (CarbonColumn col : carbonColumns) {
//show columns command will return these data
- Type columnType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
- ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(),
- columnType);
+ Type columnType = CarbondataType2SpiMapper(col.getColumnSchema());
+ ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(), columnType);
columnsMetaList.add(columnMeta);
}
@@ -162,21 +161,21 @@ public class CarbondataMetadata implements ConnectorMetadata {
column.getNumberOfChild();
column.getListOfChildDimensions();
- Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+ Type spiType = CarbondataType2SpiMapper(cs);
columnHandles.put(cs.getColumnName(),
new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, column.getSchemaOrdinal(),
column.getKeyOrdinal(), column.getColumnGroupOrdinal(), false, cs.getColumnGroupId(),
- cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+ cs.getColumnUniqueId(), cs.isUseInvertedIndex(), cs.getPrecision(), cs.getScale()));
}
for (CarbonMeasure measure : cb.getMeasureByTableName(tableName)) {
ColumnSchema cs = measure.getColumnSchema();
- Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+ Type spiType = CarbondataType2SpiMapper(cs);
columnHandles.put(cs.getColumnName(),
new CarbondataColumnHandle(connectorId, cs.getColumnName(), spiType, cs.getSchemaOrdinal(),
measure.getOrdinal(), cs.getColumnGroupId(), true, cs.getColumnGroupId(),
- cs.getColumnUniqueId(), cs.isUseInvertedIndex()));
+ cs.getColumnUniqueId(), cs.isUseInvertedIndex(), cs.getPrecision(), cs.getScale()));
}
//should i cache it?
@@ -230,7 +229,8 @@ public class CarbondataMetadata implements ConnectorMetadata {
return getTableMetadata(carbondataTableHandle.getSchemaTableName());
}
- public static Type CarbondataType2SpiMapper(DataType colType) {
+ public static Type CarbondataType2SpiMapper(ColumnSchema columnSchema) {
+ DataType colType = columnSchema.getDataType();
switch (colType) {
case BOOLEAN:
return BooleanType.BOOLEAN;
@@ -243,9 +243,12 @@ public class CarbondataMetadata implements ConnectorMetadata {
case FLOAT:
case DOUBLE:
return DoubleType.DOUBLE;
-
case DECIMAL:
- return DecimalType.createDecimalType();
+ if(columnSchema.getPrecision() > 0){
+ return DecimalType.createDecimalType(columnSchema.getPrecision(), columnSchema.getScale());
+ } else {
+ return DecimalType.createDecimalType();
+ }
case STRING:
return VarcharType.VARCHAR;
case DATE:
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
new file mode 100755
index 0000000..1d8b2b2
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataModule.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import org.apache.carbondata.presto.impl.CarbonTableConfig;
+import org.apache.carbondata.presto.impl.CarbonTableReader;
+
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.type.TypeManager;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+import javax.inject.Inject;
+
+import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataModule implements Module {
+
+ private final String connectorId;
+ private final TypeManager typeManager;
+
+ public CarbondataModule(String connectorId, TypeManager typeManager) {
+ this.connectorId = requireNonNull(connectorId, "connector id is null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ }
+
+ @Override public void configure(Binder binder) {
+ binder.bind(TypeManager.class).toInstance(typeManager);
+
+ binder.bind(CarbondataConnectorId.class).toInstance(new CarbondataConnectorId(connectorId));
+ binder.bind(CarbondataMetadata.class).in(Scopes.SINGLETON);
+ binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorSplitManager.class).to(CarbondataSplitManager.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorRecordSetProvider.class).to(CarbondataRecordSetProvider.class)
+ .in(Scopes.SINGLETON);
+ binder.bind(ConnectorPageSourceProvider.class).to(CarbondataPageSourceProvider.class)
+ .in(Scopes.SINGLETON);
+ binder.bind(CarbondataHandleResolver.class).in(Scopes.SINGLETON);
+ binder.bind(CarbondataRecordSetProvider.class).in(Scopes.SINGLETON);
+ configBinder(binder).bindConfig(CarbonTableConfig.class);
+ }
+
+ public static final class TypeDeserializer extends FromStringDeserializer<Type> {
+ private final TypeManager typeManager;
+
+ @Inject public TypeDeserializer(TypeManager typeManager) {
+ super(Type.class);
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ }
+
+ @Override protected Type _deserialize(String value, DeserializationContext context) {
+ Type type = typeManager.getType(parseTypeSignature(value));
+ checkArgument(type != null, "Unknown type %s", value);
+ return type;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
new file mode 100644
index 0000000..7c50c66
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSource.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.Page;
+import com.facebook.presto.spi.PageBuilder;
+import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.type.DecimalType;
+import com.facebook.presto.spi.type.Type;
+import io.airlift.slice.Slice;
+
+import static com.facebook.presto.spi.type.Decimals.encodeUnscaledValue;
+import static com.facebook.presto.spi.type.Decimals.isShortDecimal;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.math.RoundingMode.HALF_UP;
+import static java.util.Collections.unmodifiableList;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Carbondata Page Source class for custom Carbondata RecordSet Iteration.
+ */
+public class CarbondataPageSource implements ConnectorPageSource {
+
+ private static final int ROWS_PER_REQUEST = 4096;
+ private final RecordCursor cursor;
+ private final List<Type> types;
+ private final PageBuilder pageBuilder;
+ private boolean closed;
+ private final char[] buffer = new char[100];
+
+ public CarbondataPageSource(RecordSet recordSet)
+ {
+ this(requireNonNull(recordSet, "recordSet is null").getColumnTypes(), recordSet.cursor());
+ }
+
+ public CarbondataPageSource(List<Type> types, RecordCursor cursor)
+ {
+ this.cursor = requireNonNull(cursor, "cursor is null");
+ this.types = unmodifiableList(new ArrayList<>(requireNonNull(types, "types is null")));
+ this.pageBuilder = new PageBuilder(this.types);
+ }
+
+ public RecordCursor getCursor()
+ {
+ return cursor;
+ }
+
+ @Override public long getTotalBytes() {
+ return cursor.getTotalBytes();
+ }
+
+ @Override public long getCompletedBytes() {
+ return cursor.getCompletedBytes();
+ }
+
+ @Override public long getReadTimeNanos() {
+ return cursor.getReadTimeNanos();
+ }
+
+ @Override public boolean isFinished() {
+ return closed && pageBuilder.isEmpty();
+ }
+
+ @Override public Page getNextPage() {
+ if (!closed) {
+ int i;
+ for (i = 0; i < ROWS_PER_REQUEST; i++) {
+ if (pageBuilder.isFull()) {
+ break;
+ }
+ if (!cursor.advanceNextPosition()) {
+ closed = true;
+ break;
+ }
+
+ pageBuilder.declarePosition();
+ for (int column = 0; column < types.size(); column++) {
+ BlockBuilder output = pageBuilder.getBlockBuilder(column);
+ if (cursor.isNull(column)) {
+ output.appendNull();
+ } else {
+ Type type = types.get(column);
+ Class<?> javaType = type.getJavaType();
+ if (javaType == boolean.class) {
+ type.writeBoolean(output, cursor.getBoolean(column));
+ } else if (javaType == long.class) {
+ type.writeLong(output, cursor.getLong(column));
+ } else if (javaType == double.class) {
+ type.writeDouble(output, cursor.getDouble(column));
+ } else if (javaType == Slice.class) {
+ Slice slice = cursor.getSlice(column);
+ if(type instanceof DecimalType)
+ {
+ if (isShortDecimal(type)) {
+ type.writeLong(output, parseLong((DecimalType) type, slice, 0, slice.length()));
+ } else {
+ type.writeSlice(output, parseSlice((DecimalType) type, slice, 0, slice.length()));
+ }
+ } else {
+ type.writeSlice(output, slice, 0, slice.length());
+ }
+ } else {
+ type.writeObject(output, cursor.getObject(column));
+ }
+ }
+ }
+ }
+ }
+
+ // only return a page if the buffer is full or we are finishing
+ if (pageBuilder.isEmpty() || (!closed && !pageBuilder.isFull())) {
+ return null;
+ }
+ Page page = pageBuilder.build();
+ pageBuilder.reset();
+ return page;
+ }
+
+ @Override public long getSystemMemoryUsage() {
+ return cursor.getSystemMemoryUsage() + pageBuilder.getSizeInBytes();
+ }
+
+ @Override public void close() throws IOException {
+ closed = true;
+ cursor.close();
+
+ }
+
+ private long parseLong(DecimalType type, Slice slice, int offset, int length)
+ {
+ BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
+ return decimal.unscaledValue().longValue();
+ }
+
+
+ private Slice parseSlice(DecimalType type, Slice slice, int offset, int length)
+ {
+ BigDecimal decimal = parseBigDecimal(type, slice, offset, length);
+ return encodeUnscaledValue(decimal.unscaledValue());
+ }
+
+ private BigDecimal parseBigDecimal(DecimalType type, Slice slice, int offset, int length)
+ {
+ checkArgument(length < buffer.length);
+ for (int i = 0; i < length; i++) {
+ buffer[i] = (char) slice.getByte(offset + i);
+ }
+ BigDecimal decimal = new BigDecimal(buffer, 0, length);
+ checkState(decimal.scale() <= type.getScale(), "Read decimal value scale larger than column scale");
+ decimal = decimal.setScale(type.getScale(), HALF_UP);
+ checkState(decimal.precision() <= type.getPrecision(), "Read decimal precision larger than column precision");
+ return decimal;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
new file mode 100644
index 0000000..46d8611
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataPageSourceProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import java.util.List;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorPageSource;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.connector.ConnectorPageSourceProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.google.inject.Inject;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Provider Class for Carbondata Page Source class.
+ */
+public class CarbondataPageSourceProvider implements ConnectorPageSourceProvider {
+
+ private CarbondataRecordSetProvider carbondataRecordSetProvider;
+
+ @Inject
+ public CarbondataPageSourceProvider(CarbondataRecordSetProvider carbondataRecordSetProvider)
+ {
+ this.carbondataRecordSetProvider = requireNonNull(carbondataRecordSetProvider, "recordSetProvider is null");
+ }
+
+ @Override
+ public ConnectorPageSource createPageSource(ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns) {
+ return new CarbondataPageSource(carbondataRecordSetProvider.getRecordSet(transactionHandle, session, split, columns));
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
index ad47f75..2e97dc0 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
@@ -18,7 +18,14 @@
package org.apache.carbondata.presto;
import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.type.DecimalType;
+import com.facebook.presto.spi.type.Decimals;
+import com.facebook.presto.spi.type.TimestampType;
import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.block.Block;
+import com.facebook.presto.spi.block.BlockBuilder;
+import com.facebook.presto.spi.block.BlockBuilderStatus;
+
import com.google.common.base.Strings;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
@@ -26,16 +33,22 @@ import io.airlift.slice.Slices;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+
import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
+import static com.facebook.presto.spi.type.Decimals.isShortDecimal;
+import static com.facebook.presto.spi.type.Decimals.rescale;
import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+import static io.airlift.slice.Slices.utf8Slice;
public class CarbondataRecordCursor implements RecordCursor {
@@ -114,8 +127,10 @@ public class CarbondataRecordCursor implements RecordCursor {
@Override public long getLong(int field) {
String timeStr = getFieldValue(field);
- Long milliSec = 0L;
-
+ Type actual = getType(field);
+ if(actual instanceof TimestampType){
+ return new Timestamp(Long.parseLong(timeStr)).getTime()/1000;
+ }
//suppose the
return Math.round(Double.parseDouble(getFieldValue(field)));
}
@@ -126,8 +141,41 @@ public class CarbondataRecordCursor implements RecordCursor {
}
@Override public Slice getSlice(int field) {
- checkFieldType(field, VARCHAR);
- return Slices.utf8Slice(getFieldValue(field));
+ Type decimalType = getType(field);
+ if (decimalType instanceof DecimalType) {
+ DecimalType actual = (DecimalType) decimalType;
+ CarbondataColumnHandle carbondataColumnHandle = columnHandles.get(field);
+ if(carbondataColumnHandle.getPrecision() > 0 ) {
+ checkFieldType(field, DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(), carbondataColumnHandle.getScale()));
+ } else {
+ checkFieldType(field, DecimalType.createDecimalType());
+ }
+ String fieldValue = getFieldValue(field);
+ BigDecimal bigDecimalValue = new BigDecimal(fieldValue);
+ if (isShortDecimal(decimalType)) {
+ return utf8Slice(Decimals.toString(bigDecimalValue.longValue(), actual.getScale()));
+ } else {
+ if (bigDecimalValue.scale() > actual.getScale()) {
+ BigInteger unscaledDecimal =
+ rescale(bigDecimalValue.unscaledValue(), bigDecimalValue.scale(),
+ bigDecimalValue.scale());
+ Slice decimalSlice = Decimals.encodeUnscaledValue(unscaledDecimal);
+ return utf8Slice(Decimals.toString(decimalSlice, actual.getScale()));
+ //return decimalSlice;
+ } else {
+ BigInteger unscaledDecimal =
+ rescale(bigDecimalValue.unscaledValue(), bigDecimalValue.scale(), actual.getScale());
+ Slice decimalSlice = Decimals.encodeUnscaledValue(unscaledDecimal);
+ return utf8Slice(Decimals.toString(decimalSlice, actual.getScale()));
+ //return decimalSlice;
+
+ }
+
+ }
+ } else {
+ checkFieldType(field, VARCHAR);
+ return utf8Slice(getFieldValue(field));
+ }
}
@Override public Object getObject(int field) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
index d3fd7a0..7bf0e84 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
@@ -99,12 +99,10 @@ public class CarbondataRecordSet implements RecordSet {
RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns, split);
return rc;
} catch (QueryExecutionException e) {
- //throw new InterruptedException(e.getMessage());
- System.out.println(e.getMessage());
- } catch (Exception ex) {
- System.out.println(ex.toString());
+ throw new RuntimeException(e.getMessage(), e);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex.getMessage(), ex);
}
- return null;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5f6f1a5c/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index f0958c7..a9652cc 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -129,7 +129,7 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
Type type = cdch.getColumnType();
- DataType coltype = Spi2CarbondataTypeMapper(type);
+ DataType coltype = Spi2CarbondataTypeMapper(cdch);
Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
domain = originalConstraint.getDomains().get().get(c);
@@ -200,6 +200,10 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
if (coltype.equals(DataType.STRING)) {
ex = new EqualToExpression(colExpression,
new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+ } else if (coltype.equals(DataType.TIMESTAMP) || coltype.equals(DataType.DATE)) {
+ Long value = (Long) singleValues.get(0) * 1000;
+ ex = new EqualToExpression(colExpression,
+ new LiteralExpression(value , coltype));
} else ex = new EqualToExpression(colExpression,
new LiteralExpression(singleValues.get(0), coltype));
filters.add(ex);
@@ -241,16 +245,18 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier()));
}
- public static DataType Spi2CarbondataTypeMapper(Type colType) {
+ public static DataType Spi2CarbondataTypeMapper(CarbondataColumnHandle carbondataColumnHandle) {
+ Type colType = carbondataColumnHandle.getColumnType();
if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
else if (colType == IntegerType.INTEGER) return DataType.INT;
else if (colType == BigintType.BIGINT) return DataType.LONG;
else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
- else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
else if (colType == VarcharType.VARCHAR) return DataType.STRING;
else if (colType == DateType.DATE) return DataType.DATE;
else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+ else if (colType == DecimalType.createDecimalType(carbondataColumnHandle.getPrecision(),
+ carbondataColumnHandle.getScale())) return DataType.DECIMAL;
else return DataType.STRING;
}
[4/5] carbondata git commit: refactor integration/presto
Posted by ch...@apache.org.
refactor integration/presto
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0988a847
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0988a847
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0988a847
Branch: refs/heads/master
Commit: 0988a847d4bbfe38a2741d32aaf7c9a32f7e7889
Parents: f32c503
Author: chenliang613 <ch...@huawei.com>
Authored: Sat Apr 8 15:07:47 2017 +0530
Committer: chenliang613 <ch...@apache.org>
Committed: Wed May 24 18:42:11 2017 +0800
----------------------------------------------------------------------
.../presto/CarbondataConnectorFactory.java | 12 ------
.../carbondata/presto/CarbondataMetadata.java | 34 ++++++++---------
.../presto/CarbondataRecordSetProvider.java | 12 +++---
.../presto/impl/CarbonTableReader.java | 40 +++++++++-----------
4 files changed, 37 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0988a847/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
index 66c007d..d1c8082 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -67,25 +67,13 @@ public class CarbondataConnectorFactory implements ConnectorFactory {
LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
- //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
ConnectorRecordSetProvider connectorRecordSet =
injector.getInstance(ConnectorRecordSetProvider.class);
- //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class);
-
- //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
- //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);
- //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
- //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
- //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
return new CarbondataConnector(lifeCycleManager, metadata,
new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
- //new ClassLoaderSafeConnectorRecordSetProvider(, classLoader),
classLoader
- //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
- //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
- //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
);
} catch (Exception e) {
throw Throwables.propagate(e);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0988a847/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index d938a3d..f2d594a 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -59,11 +59,11 @@ public class CarbondataMetadata implements ConnectorMetadata {
}
public List<String> listSchemaNamesInternal() {
- List<String> ret;
+ List<String> schemaNameList;
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
- ret = carbonTableReader.getSchemaNames();
+ schemaNameList = carbonTableReader.getSchemaNames();
}
- return ret;
+ return schemaNameList;
}
@Override
@@ -109,27 +109,28 @@ public class CarbondataMetadata implements ConnectorMetadata {
return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
}
- private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName) {
- if (!listSchemaNamesInternal().contains(tableName.getSchemaName())) {
+ private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName) {
+ if (!listSchemaNamesInternal().contains(schemaTableName.getSchemaName())) {
return null;
}
- CarbonTable cb = carbonTableReader.getTable(tableName);
- if (cb == null) {
+ CarbonTable carbonTable = carbonTableReader.getTable(schemaTableName);
+ if (carbonTable == null) {
return null;
}
- List<ColumnMetadata> spiCols = new LinkedList<>();
- List<CarbonColumn> carbonColumns = cb.getCreateOrderColumn(tableName.getTableName());
+ List<ColumnMetadata> columnsMetaList = new LinkedList<>();
+ List<CarbonColumn> carbonColumns = carbonTable.getCreateOrderColumn(schemaTableName.getTableName());
for (CarbonColumn col : carbonColumns) {
//show columns command will return these data
- Type spiType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
- ColumnMetadata spiCol = new ColumnMetadata(col.getColumnSchema().getColumnName(), spiType);
- spiCols.add(spiCol);
+ Type columnType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
+ ColumnMetadata columnMeta = new ColumnMetadata(col.getColumnSchema().getColumnName(),
+ columnType);
+ columnsMetaList.add(columnMeta);
}
//carbondata connector's table metadata
- return new ConnectorTableMetadata(tableName, spiCols);
+ return new ConnectorTableMetadata(schemaTableName, columnsMetaList);
}
@Override public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session,
@@ -141,6 +142,7 @@ public class CarbondataMetadata implements ConnectorMetadata {
"tableHandle is not for this connector");
String schemaName = handle.getSchemaTableName().getSchemaName();
+
if (!listSchemaNamesInternal().contains(schemaName)) {
throw new SchemaNotFoundException(schemaName);
}
@@ -250,12 +252,6 @@ public class CarbondataMetadata implements ConnectorMetadata {
return DateType.DATE;
case TIMESTAMP:
return TimestampType.TIMESTAMP;
-
- /*case DataType.MAP:
- case DataType.ARRAY:
- case DataType.STRUCT:
- case DataType.NULL:*/
-
default:
return VarcharType.VARCHAR;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0988a847/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index 8b087df..f0958c7 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -61,8 +61,6 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
@Inject
public CarbondataRecordSetProvider(CarbondataConnectorId connectorId, CarbonTableReader reader) {
- //this.config = requireNonNull(config, "config is null");
- //this.connector = requireNonNull(connector, "connector is null");
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
this.carbonTableReader = reader;
}
@@ -72,9 +70,9 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
requireNonNull(split, "split is null");
requireNonNull(columns, "columns is null");
- CarbondataSplit cdSplit =
+ CarbondataSplit carbondataSplit =
checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
- checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
+ checkArgument(carbondataSplit.getConnectorId().equals(connectorId), "split is not for this connector");
String targetCols = "";
// Convert all columns handles
@@ -95,7 +93,7 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
//String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
CarbonTableCacheModel tableCacheModel =
- carbonTableReader.getCarbonCache(cdSplit.getSchemaTableName());
+ carbonTableReader.getCarbonCache(carbondataSplit.getSchemaTableName());
checkNotNull(tableCacheModel, "tableCacheModel should not be null");
checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null");
@@ -107,10 +105,10 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable);
// Push down filter
- fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
+ fillFilter2QueryModel(queryModel, carbondataSplit.getConstraints(), targetTable);
// Return new record set
- return new CarbondataRecordSet(targetTable, session, cdSplit,
+ return new CarbondataRecordSet(targetTable, session, carbondataSplit,
handles.build(), queryModel);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0988a847/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index bb482b0..b6e45d6 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -52,18 +52,13 @@ import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CacheClient;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.thrift.TBase;
-import java.io.BufferedReader;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URI;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -82,7 +77,7 @@ public class CarbonTableReader {
private CarbonTableConfig config;
private List<SchemaTableName> tableList;
- private CarbonFile dbStore;
+ private CarbonFile carbonFileList;
private FileFactory.FileType fileType;
// A cache for Carbon reader
@@ -98,10 +93,10 @@ public class CarbonTableReader {
if (!cc.containsKey(table)) {
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
FileFactory.class.getClassLoader())) {
- if (dbStore == null) {
+ if (carbonFileList == null) {
fileType = FileFactory.getFileType(config.getStorePath());
try {
- dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+ carbonFileList = FileFactory.getCarbonFile(config.getStorePath(), fileType);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
@@ -126,11 +121,11 @@ public class CarbonTableReader {
}
};
- public boolean updateDbStore() {
- if (dbStore == null) {
+ public boolean updateCarbonFile() {
+ if (carbonFileList == null) {
fileType = FileFactory.getFileType(config.getStorePath());
try {
- dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+ carbonFileList = FileFactory.getCarbonFile(config.getStorePath(), fileType);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
@@ -139,12 +134,12 @@ public class CarbonTableReader {
}
public List<String> updateSchemaList() {
- updateDbStore();
+ updateCarbonFile();
- if (dbStore != null) {
- List<String> scs =
- Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
- return scs;
+ if (carbonFileList != null) {
+ List<String> schemaList =
+ Stream.of(carbonFileList.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
+ return schemaList;
} else return ImmutableList.of();
}
@@ -154,7 +149,7 @@ public class CarbonTableReader {
}
public Set<String> updateTableList(String dbName) {
- List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName()))
+ List<CarbonFile> schema = Stream.of(carbonFileList.listFiles()).filter(a -> dbName.equals(a.getName()))
.collect(Collectors.toList());
if (schema.size() > 0) {
return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName())
@@ -177,15 +172,14 @@ public class CarbonTableReader {
public void updateSchemaTables() {
// update logic determine later
- if (dbStore == null) {
+ if (carbonFileList == null) {
updateSchemaList();
}
-
tableList = new LinkedList<>();
- for (CarbonFile db : dbStore.listFiles()) {
- if (!db.getName().endsWith(".mdt")) {
- for (CarbonFile table : db.listFiles()) {
- tableList.add(new SchemaTableName(db.getName(), table.getName()));
+ for (CarbonFile cf : carbonFileList.listFiles()) {
+ if (!cf.getName().endsWith(".mdt")) {
+ for (CarbonFile table : cf.listFiles()) {
+ tableList.add(new SchemaTableName(cf.getName(), table.getName()));
}
}
}
[2/5] carbondata git commit: resolved rebase conflict for presto
branch
Posted by ch...@apache.org.
resolved rebase conflict for presto branch
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4b682f54
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4b682f54
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4b682f54
Branch: refs/heads/master
Commit: 4b682f541976cd3e56b486e3479f47096cc1bcd9
Parents: a60775e
Author: chenliang613 <ch...@apache.org>
Authored: Wed May 24 18:37:27 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Wed May 24 18:37:27 2017 +0800
----------------------------------------------------------------------
.../presto/CarbondataRecordSetProvider.java | 277 +++++++++++++++++++
1 file changed, 277 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/4b682f54/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
new file mode 100755
index 0000000..63b926f
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
+import org.apache.carbondata.presto.impl.CarbonTableReader;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.*;
+import com.google.common.collect.ImmutableList;
+import io.airlift.slice.Slice;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.*;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.carbondata.presto.Types.checkType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
+
+ private final String connectorId;
+ private final CarbonTableReader carbonTableReader;
+
+ @Inject
+ public CarbondataRecordSetProvider(CarbondataConnectorId connectorId, CarbonTableReader reader) {
+ //this.config = requireNonNull(config, "config is null");
+ //this.connector = requireNonNull(connector, "connector is null");
+ this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+ this.carbonTableReader = reader;
+ }
+
+ @Override public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) {
+ requireNonNull(split, "split is null");
+ requireNonNull(columns, "columns is null");
+
+ // Convert split
+ CarbondataSplit cdSplit =
+ checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
+ checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
+
+ String targetCols = "";
+ // Convert all columns handles
+ ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder();
+ for (ColumnHandle handle : columns) {
+ handles.add(checkType(handle, CarbondataColumnHandle.class, "handle"));
+ targetCols += ((CarbondataColumnHandle) handle).getColumnName() + ",";
+ }
+
+ // Build column projection(check the column order)
+ if (targetCols.length() > 0) {
+ targetCols = targetCols.substring(0, targetCols.length() - 1);
+ }
+ else
+ {
+ targetCols = null;
+ }
+ //String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
+
+ CarbonTableCacheModel tableCacheModel =
+ carbonTableReader.getCarbonCache(cdSplit.getSchemaTableName());
+ checkNotNull(tableCacheModel, "tableCacheModel should not be null");
+ checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
+ checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null");
+
+ // Build Query Model
+ CarbonTable targetTable = tableCacheModel.carbonTable;
+ CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(targetTable, targetCols);
+ QueryModel queryModel =
+ QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable);
+
+ // Push down filter
+ fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
+
+ // Return new record set
+ return new CarbondataRecordSet(targetTable,/*connector,*/ session, /*config, */cdSplit,
+ handles.build(), queryModel);
+ }
+
+ // Build filter for QueryModel (copy from CarbonInputFormat=> createRecordReader)
+ private void fillFilter2QueryModel(QueryModel queryModel,
+ TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) {
+
+ //queryModel.setFilterExpressionResolverTree(new FilterResolverIntf());
+
+ //Build Predicate Expression
+ ImmutableList.Builder<Expression> filters = ImmutableList.builder();
+
+ Domain domain = null;
+
+ for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+
+ // Build ColumnExpresstion for Expresstion(Carbondata)
+ CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
+ Type type = cdch.getColumnType();
+
+ DataType coltype = Spi2CarbondataTypeMapper(type);
+ Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
+
+ domain = originalConstraint.getDomains().get().get(c);
+ checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+ if (domain.getValues().isNone()) {
+ //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
+ //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
+ //new Expression()
+ }
+
+ if (domain.getValues().isAll()) {
+ //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
+ //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
+ }
+
+ List<Object> singleValues = new ArrayList<>();
+ List<Expression> rangeFilter = new ArrayList<>();
+ for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
+ checkState(!range.isAll()); // Already checked
+ if (range.isSingleValue()) {
+ singleValues.add(range.getLow().getValue());
+ } else {
+ List<String> rangeConjuncts = new ArrayList<>();
+ if (!range.getLow().isLowerUnbounded()) {
+ Object value = ConvertDataByType(range.getLow().getValue(), type);
+ switch (range.getLow().getBound()) {
+ case ABOVE:
+ if (type == TimestampType.TIMESTAMP) {
+ //todo not now
+ } else {
+ GreaterThanExpression greater = new GreaterThanExpression(colExpression,
+ new LiteralExpression(value, coltype));
+ //greater.setRangeExpression(true);
+ rangeFilter.add(greater);
+ }
+ break;
+ case EXACTLY:
+ GreaterThanEqualToExpression greater =
+ new GreaterThanEqualToExpression(colExpression,
+ new LiteralExpression(value, coltype));
+ //greater.setRangeExpression(true);
+ rangeFilter.add(greater);
+ break;
+ case BELOW:
+ throw new IllegalArgumentException("Low marker should never use BELOW bound");
+ default:
+ throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
+ }
+ }
+ if (!range.getHigh().isUpperUnbounded()) {
+ Object value = ConvertDataByType(range.getHigh().getValue(), type);
+ switch (range.getHigh().getBound()) {
+ case ABOVE:
+ throw new IllegalArgumentException("High marker should never use ABOVE bound");
+ case EXACTLY:
+ LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
+ new LiteralExpression(value, coltype));
+ //less.setRangeExpression(true);
+ rangeFilter.add(less);
+ break;
+ case BELOW:
+ LessThanExpression less2 =
+ new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
+ //less2.setRangeExpression(true);
+ rangeFilter.add(less2);
+ break;
+ default:
+ throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
+ }
+ }
+ }
+ }
+
+ if (singleValues.size() == 1) {
+ Expression ex = null;
+ if (coltype.equals(DataType.STRING)) {
+ ex = new EqualToExpression(colExpression,
+ new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+ } else ex = new EqualToExpression(colExpression,
+ new LiteralExpression(singleValues.get(0), coltype));
+ filters.add(ex);
+ } else if (singleValues.size() > 1) {
+ ListExpression candidates = null;
+ List<Expression> exs = singleValues.stream().map((a) -> {
+ return new LiteralExpression(ConvertDataByType(a, type), coltype);
+ }).collect(Collectors.toList());
+ candidates = new ListExpression(exs);
+
+ if (candidates != null) filters.add(new InExpression(colExpression, candidates));
+ } else if (rangeFilter.size() > 0) {
+ if (rangeFilter.size() > 1) {
+ Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
+ if (rangeFilter.size() > 2) {
+ for (int i = 2; i < rangeFilter.size(); i++) {
+ filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
+ }
+ }
+ } else if (rangeFilter.size() == 1) filters.add(rangeFilter.get(0));
+ }
+ }
+
+ Expression finalFilters;
+ List<Expression> tmp = filters.build();
+ if (tmp.size() > 1) {
+ finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
+ if (tmp.size() > 2) {
+ for (int i = 2; i < tmp.size(); i++) {
+ finalFilters = new AndExpression(finalFilters, tmp.get(i));
+ }
+ }
+ } else if (tmp.size() == 1) finalFilters = tmp.get(0);
+ else return;
+
+ // todo set into QueryModel
+ CarbonInputFormatUtil.processFilterExpression(finalFilters, carbonTable);
+ queryModel.setFilterExpressionResolverTree(
+ CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier()));
+ }
+
+ public static DataType Spi2CarbondataTypeMapper(Type colType) {
+ if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
+ else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
+ else if (colType == IntegerType.INTEGER) return DataType.INT;
+ else if (colType == BigintType.BIGINT) return DataType.LONG;
+ else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
+ else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
+ else if (colType == VarcharType.VARCHAR) return DataType.STRING;
+ else if (colType == DateType.DATE) return DataType.DATE;
+ else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+ else return DataType.STRING;
+ }
+
+ public Object ConvertDataByType(Object rawdata, Type type) {
+ if (type.equals(IntegerType.INTEGER)) return new Integer((rawdata.toString()));
+ else if (type.equals(BigintType.BIGINT)) return (Long) rawdata;
+ else if (type.equals(VarcharType.VARCHAR)) return ((Slice) rawdata).toStringUtf8();
+ else if (type.equals(BooleanType.BOOLEAN)) return (Boolean) (rawdata);
+
+ return rawdata;
+ }
+}
[3/5] carbondata git commit: resloved rebase conflict for presto
branch
Posted by ch...@apache.org.
resloved rebase conflict for presto branch
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f32c503a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f32c503a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f32c503a
Branch: refs/heads/master
Commit: f32c503add5a774fd91efdd3222fd76c112981f3
Parents: 4b682f5
Author: chenliang613 <ch...@apache.org>
Authored: Wed May 24 18:42:00 2017 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Wed May 24 18:42:00 2017 +0800
----------------------------------------------------------------------
.../presto/CarbondataColumnConstraint.java | 86 ++++++
.../presto/CarbondataColumnHandle.java | 6 -
.../presto/CarbondataConnectorFactory.java | 94 +++++++
.../carbondata/presto/CarbondataMetadata.java | 7 -
.../presto/CarbondataRecordCursor.java | 158 +++++++++++
.../carbondata/presto/CarbondataRecordSet.java | 110 ++++++++
.../presto/CarbondataRecordSetProvider.java | 14 +-
.../presto/CarbondataSplitManager.java | 279 +++++++++++++++++++
.../presto/impl/CarbonTableCacheModel.java | 44 +++
.../presto/impl/CarbonTableConfig.java | 3 +
.../presto/impl/CarbonTableReader.java | 40 ++-
11 files changed, 793 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
new file mode 100755
index 0000000..82c7c78
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnConstraint.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.predicate.Domain;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSetter;
+
+import java.util.Objects;
+import java.util.Optional;
+
+//import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Objects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Encapsulating presto Tuple-domain
+ */
+public class CarbondataColumnConstraint {
+ private final String name;
+ private final boolean invertedindexed;
+ private Optional<Domain> domain;
+
+ @JsonCreator public CarbondataColumnConstraint(@JsonProperty("name") String name,
+ @JsonProperty("domain") Optional<Domain> domain,
+ @JsonProperty("invertedindexed") boolean invertedindexed) {
+ this.name = requireNonNull(name, "name is null");
+ this.invertedindexed = requireNonNull(invertedindexed, "invertedIndexed is null");
+ this.domain = requireNonNull(domain, "domain is null");
+ }
+
+ @JsonProperty public boolean isInvertedindexed() {
+ return invertedindexed;
+ }
+
+ @JsonProperty public String getName() {
+ return name;
+ }
+
+ @JsonProperty public Optional<Domain> getDomain() {
+ return domain;
+ }
+
+ @JsonSetter public void setDomain(Optional<Domain> domain) {
+ this.domain = domain;
+ }
+
+ @Override public int hashCode() {
+ return Objects.hash(name, domain, invertedindexed);
+ }
+
+ @Override public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+
+ CarbondataColumnConstraint other = (CarbondataColumnConstraint) obj;
+ return Objects.equals(this.name, other.name) && Objects.equals(this.domain, other.domain)
+ && Objects.equals(this.invertedindexed, other.invertedindexed);
+ }
+
+ @Override public String toString() {
+ return toStringHelper(this).add("name", this.name).add("invertedindexed", this.invertedindexed)
+ .add("domain", this.domain).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
index cc10165..b9152b5 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataColumnHandle.java
@@ -68,12 +68,6 @@ public class CarbondataColumnHandle implements ColumnHandle {
return columnUniqueId;
}
- /**
- * ordinalPosition of a columnhandle is the -> number of the column in the entire list of columns of this table
- * IT DOESNT DEPEND ON THE QUERY (select clm3, clm0, clm1 from tablename)
- * The columnhandle of clm3 : has ordinalposition = 3
- */
-
@JsonCreator public CarbondataColumnHandle(@JsonProperty("connectorId") String connectorId,
@JsonProperty("columnName") String columnName, @JsonProperty("columnType") Type columnType,
@JsonProperty("ordinalPosition") int ordinalPosition,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
new file mode 100755
index 0000000..66c007d
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataConnectorFactory.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
+import com.google.common.base.Throwables;
+import com.google.inject.Injector;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.json.JsonModule;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Build Carbondata Connector
+ * It will be called by CarbondataPlugin
+ */
+public class CarbondataConnectorFactory implements ConnectorFactory {
+
+ private final String name;
+ private final ClassLoader classLoader;
+
+ public CarbondataConnectorFactory(String connectorName, ClassLoader classLoader) {
+ this.name = connectorName;
+ this.classLoader = requireNonNull(classLoader, "classLoader is null");
+ }
+
+ @Override public String getName() {
+ return name;
+ }
+
+ @Override public ConnectorHandleResolver getHandleResolver() {
+ return new CarbondataHandleResolver();
+ }
+
+ @Override public Connector create(String connectorId, Map<String, String> config,
+ ConnectorContext context) {
+ requireNonNull(config, "config is null");
+
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+ Bootstrap app = new Bootstrap(new JsonModule(),
+ new CarbondataModule(connectorId, context.getTypeManager()));
+
+ Injector injector =
+ app.strictConfig().doNotInitializeLogging().setRequiredConfigurationProperties(config)
+ .initialize();
+
+ LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
+ CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
+ //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
+ ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
+ ConnectorRecordSetProvider connectorRecordSet =
+ injector.getInstance(ConnectorRecordSetProvider.class);
+ //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class);
+
+ //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
+ //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);
+ //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
+ //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
+ //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
+
+ return new CarbondataConnector(lifeCycleManager, metadata,
+ new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader), connectorRecordSet,
+ //new ClassLoaderSafeConnectorRecordSetProvider(, classLoader),
+ classLoader
+ //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
+ //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
+ //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
+ );
+ } catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
index d2c5ab6..d938a3d 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataMetadata.java
@@ -69,13 +69,6 @@ public class CarbondataMetadata implements ConnectorMetadata {
@Override
public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) {
- /*List<SchemaTableName> all = carbonTableReader.getTableList();
- if(schemaNameOrNull != null)
- {
- return all.stream().filter(a -> schemaNameOrNull.equals(a.getSchemaName())).collect(Collectors.toList());
- }
- return all;*/
-
List<String> schemaNames;
if (schemaNameOrNull != null) {
schemaNames = ImmutableList.of(schemaNameOrNull);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
new file mode 100755
index 0000000..ad47f75
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordCursor.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.type.Type;
+import com.google.common.base.Strings;
+import io.airlift.log.Logger;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
+import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
+import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+public class CarbondataRecordCursor implements RecordCursor {
+
+ private static final Logger log = Logger.get(CarbondataRecordCursor.class);
+ private final List<CarbondataColumnHandle> columnHandles;
+
+ private List<String> fields;
+ private CarbondataSplit split;
+ private CarbonIterator<Object[]> rowCursor;
+ private CarbonReadSupport<Object[]> readSupport;
+
+ private long totalBytes;
+ private long nanoStart;
+ private long nanoEnd;
+
+ public CarbondataRecordCursor(CarbonReadSupport<Object[]> readSupport,
+ CarbonIterator<Object[]> carbonIterator, List<CarbondataColumnHandle> columnHandles,
+ CarbondataSplit split) {
+ this.rowCursor = carbonIterator;
+ this.columnHandles = columnHandles;
+ this.readSupport = readSupport;
+ this.totalBytes = 0;
+ }
+
+ @Override public long getTotalBytes() {
+ return totalBytes;
+ }
+
+ @Override public long getCompletedBytes() {
+ return totalBytes;
+ }
+
+ @Override public long getReadTimeNanos() {
+ return nanoStart > 0L ? (nanoEnd == 0 ? System.nanoTime() : nanoEnd) - nanoStart : 0L;
+ }
+
+ @Override public Type getType(int field) {
+
+ checkArgument(field < columnHandles.size(), "Invalid field index");
+ return columnHandles.get(field).getColumnType();
+ }
+
+ /**
+ * get next Row/Page
+ */
+ @Override public boolean advanceNextPosition() {
+
+ if (nanoStart == 0) {
+ nanoStart = System.nanoTime();
+ }
+
+ if (rowCursor.hasNext()) {
+ Object[] columns = readSupport.readRow(rowCursor.next());
+ fields = new ArrayList<String>();
+ if(columns != null && columns.length > 0)
+ {
+ for(Object value : columns){
+ if(value != null )
+ {
+ fields.add(value.toString());
+ } else {
+ fields.add(null);
+ }
+ }
+ }
+ totalBytes += columns.length;
+ return true;
+ }
+ return false;
+ }
+
+ @Override public boolean getBoolean(int field) {
+ checkFieldType(field, BOOLEAN);
+ return Boolean.parseBoolean(getFieldValue(field));
+ }
+
+ @Override public long getLong(int field) {
+ String timeStr = getFieldValue(field);
+ Long milliSec = 0L;
+
+ //suppose the
+ return Math.round(Double.parseDouble(getFieldValue(field)));
+ }
+
+ @Override public double getDouble(int field) {
+ checkFieldType(field, DOUBLE);
+ return Double.parseDouble(getFieldValue(field));
+ }
+
+ @Override public Slice getSlice(int field) {
+ checkFieldType(field, VARCHAR);
+ return Slices.utf8Slice(getFieldValue(field));
+ }
+
+ @Override public Object getObject(int field) {
+ return null;
+ }
+
+ @Override public boolean isNull(int field) {
+ checkArgument(field < columnHandles.size(), "Invalid field index");
+ return Strings.isNullOrEmpty(getFieldValue(field));
+ }
+
+ String getFieldValue(int field) {
+ checkState(fields != null, "Cursor has not been advanced yet");
+ return fields.get(field);
+ }
+
+ private void checkFieldType(int field, Type expected) {
+ Type actual = getType(field);
+ checkArgument(actual.equals(expected), "Expected field %s to be type %s but is %s", field,
+ expected, actual);
+ }
+
+ @Override public void close() {
+ nanoEnd = System.nanoTime();
+
+ //todo delete cache from readSupport
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
new file mode 100755
index 0000000..d3fd7a0
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSet.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.Type;
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
+//import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.carbondata.presto.Types.checkType;
+
+public class CarbondataRecordSet implements RecordSet {
+
+ private CarbonTable carbonTable;
+ private TupleDomain<ColumnHandle> originalConstraint;
+ private Expression carbonConstraint;
+ private List<CarbondataColumnConstraint> rebuildConstraints;
+ private QueryModel queryModel;
+ private CarbondataSplit split;
+ private List<CarbondataColumnHandle> columns;
+ private QueryExecutor queryExecutor;
+
+ private CarbonReadSupport<Object[]> readSupport;
+
+ public CarbondataRecordSet(CarbonTable carbonTable, ConnectorSession session,
+ ConnectorSplit split, List<CarbondataColumnHandle> columns, QueryModel queryModel) {
+ this.carbonTable = carbonTable;
+ this.split = checkType(split, CarbondataSplit.class, "connectorSplit");
+ this.originalConstraint = this.split.getConstraints();
+ this.rebuildConstraints = this.split.getRebuildConstraints();
+ this.queryModel = queryModel;
+ this.columns = columns;
+ this.readSupport = new DictionaryDecodeReadSupport();
+ }
+
+ //todo support later
+ private Expression parseConstraint2Expression(TupleDomain<ColumnHandle> constraints) {
+ return null;
+ }
+
+ @Override public List<Type> getColumnTypes() {
+ return columns.stream().map(a -> a.getColumnType()).collect(Collectors.toList());
+ }
+
+ /**
+ * get data blocks via Carbondata QueryModel API
+ */
+ @Override public RecordCursor cursor() {
+ List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
+
+ tableBlockInfoList.add(new TableBlockInfo(split.getLocalInputSplit().getPath().toString(),
+ split.getLocalInputSplit().getStart(), split.getLocalInputSplit().getSegmentId(),
+ split.getLocalInputSplit().getLocations().toArray(new String[0]),
+ split.getLocalInputSplit().getLength(),
+ //blockletInfos,
+ ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion())));
+ queryModel.setTableBlockInfos(tableBlockInfoList);
+
+ queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+
+ //queryModel.setQueryId(queryModel.getQueryId() + "_" + split.getLocalInputSplit().getSegmentId());
+ try {
+ readSupport
+ .initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
+ CarbonIterator<Object[]> carbonIterator =
+ new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
+ RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns, split);
+ return rc;
+ } catch (QueryExecutionException e) {
+ //throw new InterruptedException(e.getMessage());
+ System.out.println(e.getMessage());
+ } catch (Exception ex) {
+ System.out.println(ex.toString());
+ }
+ return null;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
index 63b926f..8b087df 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataRecordSetProvider.java
@@ -72,7 +72,6 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
requireNonNull(split, "split is null");
requireNonNull(columns, "columns is null");
- // Convert split
CarbondataSplit cdSplit =
checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
@@ -111,11 +110,11 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
// Return new record set
- return new CarbondataRecordSet(targetTable,/*connector,*/ session, /*config, */cdSplit,
+ return new CarbondataRecordSet(targetTable, session, cdSplit,
handles.build(), queryModel);
}
- // Build filter for QueryModel (copy from CarbonInputFormat=> createRecordReader)
+ // Build filter for QueryModel
private void fillFilter2QueryModel(QueryModel queryModel,
TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) {
@@ -139,14 +138,9 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
if (domain.getValues().isNone()) {
- //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
- //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
- //new Expression()
}
if (domain.getValues().isAll()) {
- //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
- //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
}
List<Object> singleValues = new ArrayList<>();
@@ -166,7 +160,6 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
} else {
GreaterThanExpression greater = new GreaterThanExpression(colExpression,
new LiteralExpression(value, coltype));
- //greater.setRangeExpression(true);
rangeFilter.add(greater);
}
break;
@@ -174,7 +167,6 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
GreaterThanEqualToExpression greater =
new GreaterThanEqualToExpression(colExpression,
new LiteralExpression(value, coltype));
- //greater.setRangeExpression(true);
rangeFilter.add(greater);
break;
case BELOW:
@@ -191,13 +183,11 @@ public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
case EXACTLY:
LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
new LiteralExpression(value, coltype));
- //less.setRangeExpression(true);
rangeFilter.add(less);
break;
case BELOW:
LessThanExpression less2 =
new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
- //less2.setRangeExpression(true);
rangeFilter.add(less2);
break;
default:
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
new file mode 100755
index 0000000..f3efb36
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/CarbondataSplitManager.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto;
+
+import org.apache.carbondata.presto.impl.CarbonLocalInputSplit;
+import org.apache.carbondata.presto.impl.CarbonTableCacheModel;
+import org.apache.carbondata.presto.impl.CarbonTableReader;
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.*;
+import com.google.common.collect.ImmutableList;
+import io.airlift.slice.Slice;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.*;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.carbondata.presto.Types.checkType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Build Carbontable splits
+ * filtering irrelevant blocks
+ */
+public class CarbondataSplitManager implements ConnectorSplitManager {
+
+ private final String connectorId;
+ private final CarbonTableReader carbonTableReader;
+
+ @Inject
+ public CarbondataSplitManager(CarbondataConnectorId connectorId, CarbonTableReader reader) {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+ this.carbonTableReader = requireNonNull(reader, "client is null");
+ }
+
+ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session, ConnectorTableLayoutHandle layout) {
+ CarbondataTableLayoutHandle layoutHandle = (CarbondataTableLayoutHandle) layout;
+ CarbondataTableHandle tableHandle = layoutHandle.getTable();
+ SchemaTableName key = tableHandle.getSchemaTableName();
+
+ // Packaging presto-TupleDomain into CarbondataColumnConstraint, to decouple from presto-spi Module
+ List<CarbondataColumnConstraint> rebuildConstraints =
+ getColumnConstraints(layoutHandle.getConstraint());
+
+ CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key);
+ Expression filters = parseFilterExpression(layoutHandle.getConstraint(), cache.carbonTable);
+
+ if (cache != null) {
+ try {
+ List<CarbonLocalInputSplit> splits = carbonTableReader.getInputSplits2(cache, filters);
+
+ ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
+ for (CarbonLocalInputSplit split : splits) {
+ cSplits.add(new CarbondataSplit(connectorId, tableHandle.getSchemaTableName(),
+ layoutHandle.getConstraint(), split, rebuildConstraints));
+ }
+ return new FixedSplitSource(cSplits.build());
+ } catch (Exception ex) {
+ System.out.println(ex.toString());
+ }
+ }
+ return null;
+ }
+
+ public List<CarbondataColumnConstraint> getColumnConstraints(
+ TupleDomain<ColumnHandle> constraint) {
+ ImmutableList.Builder<CarbondataColumnConstraint> constraintBuilder = ImmutableList.builder();
+ for (TupleDomain.ColumnDomain<ColumnHandle> columnDomain : constraint.getColumnDomains()
+ .get()) {
+ CarbondataColumnHandle columnHandle =
+ checkType(columnDomain.getColumn(), CarbondataColumnHandle.class, "column handle");
+
+ constraintBuilder.add(new CarbondataColumnConstraint(columnHandle.getColumnName(),
+ Optional.of(columnDomain.getDomain()), columnHandle.isInvertedIndex()));
+ }
+
+ return constraintBuilder.build();
+ }
+
+ /**
+ * Convert presto-TupleDomain predication into Carbon scan express condition
+ * @param originalConstraint presto-TupleDomain
+ * @param carbonTable
+ * @return
+ */
+ public Expression parseFilterExpression(TupleDomain<ColumnHandle> originalConstraint,
+ CarbonTable carbonTable) {
+ ImmutableList.Builder<Expression> filters = ImmutableList.builder();
+
+ Domain domain = null;
+
+ for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+
+ CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
+ Type type = cdch.getColumnType();
+
+ List<CarbonColumn> ccols = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName());
+ Optional<CarbonColumn> target =
+ ccols.stream().filter(a -> a.getColName().equals(cdch.getColumnName())).findFirst();
+
+ if (target.get() == null) return null;
+
+ DataType coltype = target.get().getDataType();
+ ColumnExpression colExpression =
+ new ColumnExpression(cdch.getColumnName(), target.get().getDataType());
+ //colExpression.setColIndex(cs.getSchemaOrdinal());
+ colExpression.setDimension(target.get().isDimesion());
+ colExpression.setDimension(
+ carbonTable.getDimensionByName(carbonTable.getFactTableName(), cdch.getColumnName()));
+ colExpression.setCarbonColumn(target.get());
+
+ domain = originalConstraint.getDomains().get().get(c);
+ checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+ if (domain.getValues().isNone()) {
+ }
+
+ if (domain.getValues().isAll()) {
+ }
+
+ List<Object> singleValues = new ArrayList<>();
+ List<Expression> rangeFilter = new ArrayList<>();
+ for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
+ checkState(!range.isAll()); // Already checked
+ if (range.isSingleValue()) {
+ singleValues.add(range.getLow().getValue());
+ } else {
+ List<String> rangeConjuncts = new ArrayList<>();
+ if (!range.getLow().isLowerUnbounded()) {
+ Object value = ConvertDataByType(range.getLow().getValue(), type);
+ switch (range.getLow().getBound()) {
+ case ABOVE:
+ if (type == TimestampType.TIMESTAMP) {
+ //todo not now
+ } else {
+ GreaterThanExpression greater = new GreaterThanExpression(colExpression,
+ new LiteralExpression(value, coltype));
+ rangeFilter.add(greater);
+ }
+ break;
+ case EXACTLY:
+ GreaterThanEqualToExpression greater =
+ new GreaterThanEqualToExpression(colExpression,
+ new LiteralExpression(value, coltype));
+ rangeFilter.add(greater);
+ break;
+ case BELOW:
+ throw new IllegalArgumentException("Low marker should never use BELOW bound");
+ default:
+ throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
+ }
+ }
+ if (!range.getHigh().isUpperUnbounded()) {
+ Object value = ConvertDataByType(range.getHigh().getValue(), type);
+ switch (range.getHigh().getBound()) {
+ case ABOVE:
+ throw new IllegalArgumentException("High marker should never use ABOVE bound");
+ case EXACTLY:
+ LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression,
+ new LiteralExpression(value, coltype));
+ rangeFilter.add(less);
+ break;
+ case BELOW:
+ LessThanExpression less2 =
+ new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
+ rangeFilter.add(less2);
+ break;
+ default:
+ throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
+ }
+ }
+ }
+ }
+
+ if (singleValues.size() == 1) {
+ Expression ex = null;
+ if (coltype.equals(DataType.STRING)) {
+ ex = new EqualToExpression(colExpression,
+ new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+ } else ex = new EqualToExpression(colExpression,
+ new LiteralExpression(singleValues.get(0), coltype));
+ filters.add(ex);
+ } else if (singleValues.size() > 1) {
+ ListExpression candidates = null;
+ List<Expression> exs = singleValues.stream().map((a) -> {
+ return new LiteralExpression(ConvertDataByType(a, type), coltype);
+ }).collect(Collectors.toList());
+ candidates = new ListExpression(exs);
+
+ if (candidates != null) filters.add(new InExpression(colExpression, candidates));
+ } else if (rangeFilter.size() > 0) {
+ if (rangeFilter.size() > 1) {
+ Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
+ if (rangeFilter.size() > 2) {
+ for (int i = 2; i < rangeFilter.size(); i++) {
+ filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
+ }
+ }
+ } else if (rangeFilter.size() == 1)//only have one value
+ filters.add(rangeFilter.get(0));
+ }
+ }
+
+ Expression finalFilters;
+ List<Expression> tmp = filters.build();
+ if (tmp.size() > 1) {
+ finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
+ if (tmp.size() > 2) {
+ for (int i = 2; i < tmp.size(); i++) {
+ finalFilters = new AndExpression(finalFilters, tmp.get(i));
+ }
+ }
+ } else if (tmp.size() == 1) finalFilters = tmp.get(0);
+ else//no filter
+ return null;
+
+ return finalFilters;
+ }
+
+ /**
+ * Convert presto spi Type into Carbondata Type
+ * @param colType
+ * @return
+ */
+ public static DataType Spi2CarbondataTypeMapper(Type colType) {
+ if (colType == BooleanType.BOOLEAN) return DataType.BOOLEAN;
+ else if (colType == SmallintType.SMALLINT) return DataType.SHORT;
+ else if (colType == IntegerType.INTEGER) return DataType.INT;
+ else if (colType == BigintType.BIGINT) return DataType.LONG;
+ else if (colType == DoubleType.DOUBLE) return DataType.DOUBLE;
+ else if (colType == DecimalType.createDecimalType()) return DataType.DECIMAL;
+ else if (colType == VarcharType.VARCHAR) return DataType.STRING;
+ else if (colType == DateType.DATE) return DataType.DATE;
+ else if (colType == TimestampType.TIMESTAMP) return DataType.TIMESTAMP;
+ else return DataType.STRING;
+ }
+
+ public Object ConvertDataByType(Object rawdata, Type type) {
+ if (type.equals(IntegerType.INTEGER)) return new Integer((rawdata.toString()));
+ else if (type.equals(BigintType.BIGINT)) return (Long) rawdata;
+ else if (type.equals(VarcharType.VARCHAR)) return ((Slice) rawdata).toStringUtf8();
+ else if (type.equals(BooleanType.BOOLEAN)) return (Boolean) (rawdata);
+
+ return rawdata;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
new file mode 100755
index 0000000..45755d1
--- /dev/null
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableCacheModel.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.presto.impl;
+
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+/**
+ * Caching Carbon meta(e.g. TableIdentifier, TablePath, TableInfo, CarbonTable) in Class CarbonTableReader
+ * to speed up query
+ */
+public class CarbonTableCacheModel {
+
+ public CarbonTableIdentifier carbonTableIdentifier;
+ public CarbonTablePath carbonTablePath;
+
+ public TableInfo tableInfo;
+ public CarbonTable carbonTable;
+ public String[] segments;
+
+ public boolean isValid() {
+ if (carbonTable != null && carbonTablePath != null && carbonTableIdentifier != null)
+ return true;
+ else return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
index a0ef63f..677cefd 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableConfig.java
@@ -21,6 +21,9 @@ import io.airlift.configuration.Config;
import javax.validation.constraints.NotNull;
+/**
+ * Configuration read from etc/catalog/carbondata.properties
+ */
public class CarbonTableConfig {
//read from config
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f32c503a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index 14ecfbc..bb482b0 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -71,22 +71,21 @@ import java.util.stream.Stream;
import static java.util.Objects.requireNonNull;
+/** CarbonTableReader will be a facade of these utils
+ *
+ * 1:CarbonMetadata,(logic table)
+ * 2:FileFactory, (physic table file)
+ * 3:CarbonCommonFactory, (offer some )
+ * 4:DictionaryFactory, (parse dictionary util)
+ */
public class CarbonTableReader {
- /** CarbonTableReader will be a facade of these utils
- *
- * 1:CarbonMetadata,(logic table)
- * 2:FileFactory, (physic table file)
- * 3:CarbonCommonFactory, (offer some )
- * 4:DictionaryFactory, (parse dictionary util)
- */
-
private CarbonTableConfig config;
private List<SchemaTableName> tableList;
private CarbonFile dbStore;
private FileFactory.FileType fileType;
- //as a cache for Carbon reader
+ // A cache for Carbon reader
private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;
@Inject public CarbonTableReader(CarbonTableConfig config) {
@@ -94,7 +93,7 @@ public class CarbonTableReader {
this.cc = new ConcurrentHashMap<>();
}
- //for worker node to initialize carbon metastore
+ // for worker node to initialize carbon metastore
public CarbonTableCacheModel getCarbonCache(SchemaTableName table) {
if (!cc.containsKey(table)) {
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(
@@ -120,7 +119,7 @@ public class CarbonTableReader {
return updateSchemaList();
}
- //default PathFilter
+ // default PathFilter
private static final PathFilter DefaultFilter = new PathFilter() {
@Override public boolean accept(Path path) {
return CarbonTablePath.isCarbonDataFile(path.getName());
@@ -177,7 +176,7 @@ public class CarbonTableReader {
}
public void updateSchemaTables() {
- //update logic determine later
+ // update logic determine later
if (dbStore == null) {
updateSchemaList();
}
@@ -232,14 +231,14 @@ public class CarbonTableReader {
(org.apache.carbondata.format.TableInfo) thriftReader.read();
thriftReader.close();
- //Format Level TableInfo, need transfer to Code Level TableInfo
+ // Step3: Transform Format Level TableInfo to Code Level TableInfo
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
TableInfo wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
storePath);
wrapperTableInfo.setMetaDataFilepath(
CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
- //load metadata info into CarbonMetadata
+ // Step4: Load metadata info into CarbonMetadata
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
cache.tableInfo = wrapperTableInfo;
@@ -256,7 +255,7 @@ public class CarbonTableReader {
public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel,
Expression filters) throws Exception {
- //处理filter, 下推filter,将应用在Segment的索引上
+ // need apply filters to segment
FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
AbsoluteTableIdentifier absoluteTableIdentifier =
@@ -297,7 +296,7 @@ public class CarbonTableReader {
.resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier());
List<CarbonLocalInputSplit> result = new ArrayList<>();
- //for each segment fetch blocks matching filter in Driver BTree
+ // for each segment fetch blocks matching filter in Driver BTree
for (String segmentNo : tableCacheModel.segments) {
try {
List<DataRefNode> dataRefNodes =
@@ -338,7 +337,7 @@ public class CarbonTableReader {
//DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance();
//QueryStatistic statistic = new QueryStatistic();
- //读取Segment 内部的Index
+ // read segment index
Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient,
updateStatusManager);
@@ -349,13 +348,10 @@ public class CarbonTableReader {
// build result
for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
List<DataRefNode> filterredBlocks;
- // if no filter is given get all blocks from Btree Index
+ // if no filter is given, get all blocks from Btree Index
if (null == resolver) {
filterredBlocks = getDataBlocksOfIndex(abstractIndex);
} else {
- //ignore filter
- //filterredBlocks = getDataBlocksOfIndex(abstractIndex);
-
// apply filter and get matching blocks
filterredBlocks = filterExpressionProcessor
.getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex,
@@ -568,12 +564,10 @@ public class CarbonTableReader {
try {
Configuration conf = new Configuration();
fs = segmentPath.getFileSystem(conf);
- //fs.initialize(segmentPath.toUri(), conf);
RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath);
while (iter.hasNext()) {
LocatedFileStatus stat = iter.next();
- //if(stat.getPath().toString().contains("carbondata"))//参看carbondata的carbonInputFilter的实现
if (DefaultFilter.accept(stat.getPath())) {
if (stat.isDirectory()) {
addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);