You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2017/03/23 05:13:13 UTC
[1/3] incubator-carbondata git commit: add presto integration 0.0.1
Repository: incubator-carbondata
Updated Branches:
refs/heads/master e441ab0d4 -> 9d7dbea38
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java
new file mode 100755
index 0000000..692d69e
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplitManager.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.carbondata.impl.CarbonLocalInputSplit;
+import com.facebook.presto.carbondata.impl.CarbonTableCacheModel;
+import com.facebook.presto.carbondata.impl.CarbonTableReader;
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.*;
+import com.google.common.collect.ImmutableList;
+import io.airlift.slice.Slice;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.*;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static com.facebook.presto.carbondata.Types.checkType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataSplitManager
+ implements ConnectorSplitManager
+{
+
+ private final String connectorId;
+ private final CarbonTableReader carbonTableReader;
+
+ @Inject
+ public CarbondataSplitManager(CarbondataConnectorId connectorId, CarbonTableReader reader)
+ {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+ this.carbonTableReader = requireNonNull(reader, "client is null");
+ }
+
+ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorTableLayoutHandle layout)
+ {
+ CarbondataTableLayoutHandle layoutHandle = (CarbondataTableLayoutHandle)layout;
+ CarbondataTableHandle tableHandle = layoutHandle.getTable();
+ SchemaTableName key = tableHandle.getSchemaTableName();
+
+ //get all filter domain
+ List<CarbondataColumnConstraint> rebuildConstraints = getColumnConstraints(layoutHandle.getConstraint());
+
+ CarbonTableCacheModel cache = carbonTableReader.getCarbonCache(key);
+ Expression filters = parseFilterExpression(layoutHandle.getConstraint(), cache.carbonTable);
+
+ if(cache != null) {
+ try {
+ List<CarbonLocalInputSplit> splits = carbonTableReader.getInputSplits2(cache, filters);
+
+ ImmutableList.Builder<ConnectorSplit> cSplits = ImmutableList.builder();
+ for (CarbonLocalInputSplit split : splits) {
+ cSplits.add(new CarbondataSplit(
+ connectorId,
+ tableHandle.getSchemaTableName(),
+ layoutHandle.getConstraint(),
+ split,
+ rebuildConstraints
+ ));
+ }
+ return new FixedSplitSource(cSplits.build());
+ } catch (Exception ex) {
+ System.out.println(ex.toString());
+ }
+ }
+ return null;
+ }
+
+
+ public List<CarbondataColumnConstraint> getColumnConstraints(TupleDomain<ColumnHandle> constraint)
+ {
+ ImmutableList.Builder<CarbondataColumnConstraint> constraintBuilder = ImmutableList.builder();
+ for (TupleDomain.ColumnDomain<ColumnHandle> columnDomain : constraint.getColumnDomains().get()) {
+ CarbondataColumnHandle columnHandle = checkType(columnDomain.getColumn(), CarbondataColumnHandle.class, "column handle");
+
+ constraintBuilder.add(new CarbondataColumnConstraint(
+ columnHandle.getColumnName(),
+ Optional.of(columnDomain.getDomain()),
+ columnHandle.isInvertedIndex()));
+ }
+
+ return constraintBuilder.build();
+ }
+
+
+ public Expression parseFilterExpression(TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable)
+ {
+ ImmutableList.Builder<Expression> filters = ImmutableList.builder();
+
+ Domain domain = null;
+
+ for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+
+ CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
+ Type type = cdch.getColumnType();
+
+ List<CarbonColumn> ccols = carbonTable.getCreateOrderColumn(carbonTable.getFactTableName());
+ Optional<CarbonColumn> target = ccols.stream().filter(a -> a.getColName().equals(cdch.getColumnName())).findFirst();
+
+ if(target.get() == null)
+ return null;
+
+ DataType coltype = target.get().getDataType();
+ ColumnExpression colExpression = new ColumnExpression(cdch.getColumnName(), target.get().getDataType());
+ //colExpression.setColIndex(cs.getSchemaOrdinal());
+ colExpression.setDimension(target.get().isDimesion());
+ colExpression.setDimension(carbonTable.getDimensionByName(carbonTable.getFactTableName(), cdch.getColumnName()));
+ colExpression.setCarbonColumn(target.get());
+
+ domain = originalConstraint.getDomains().get().get(c);
+ checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+ if (domain.getValues().isNone()) {
+ //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
+ //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
+ //new Expression()
+ }
+
+ if (domain.getValues().isAll()) {
+ //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
+ //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
+ }
+
+ List<Object> singleValues = new ArrayList<>();
+ List<Expression> rangeFilter = new ArrayList<>();
+ for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
+ checkState(!range.isAll()); // Already checked
+ if (range.isSingleValue()) {
+ singleValues.add(range.getLow().getValue());
+ }
+ else
+ {
+ List<String> rangeConjuncts = new ArrayList<>();
+ if (!range.getLow().isLowerUnbounded()) {
+ Object value = ConvertDataByType(range.getLow().getValue(), type);
+ switch (range.getLow().getBound()) {
+ case ABOVE:
+ if (type == TimestampType.TIMESTAMP) {
+ //todo not now
+ } else {
+ GreaterThanExpression greater = new GreaterThanExpression(colExpression, new LiteralExpression(value, coltype));
+ //greater.setRangeExpression(true);
+ rangeFilter.add(greater);
+ }
+ break;
+ case EXACTLY:
+ GreaterThanEqualToExpression greater = new GreaterThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
+ //greater.setRangeExpression(true);
+ rangeFilter.add(greater);
+ break;
+ case BELOW:
+ throw new IllegalArgumentException("Low marker should never use BELOW bound");
+ default:
+ throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
+ }
+ }
+ if (!range.getHigh().isUpperUnbounded()) {
+ Object value = ConvertDataByType(range.getHigh().getValue(), type);
+ switch (range.getHigh().getBound()) {
+ case ABOVE:
+ throw new IllegalArgumentException("High marker should never use ABOVE bound");
+ case EXACTLY:
+ LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
+ //less.setRangeExpression(true);
+ rangeFilter.add(less);
+ break;
+ case BELOW:
+ LessThanExpression less2 = new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
+ //less2.setRangeExpression(true);
+ rangeFilter.add(less2);
+ break;
+ default:
+ throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
+ }
+ }
+ }
+ }
+
+ if (singleValues.size() == 1) {
+ Expression ex = null;
+ if (coltype.equals(DataType.STRING)) {
+ ex = new EqualToExpression(colExpression, new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+ } else
+ ex = new EqualToExpression(colExpression, new LiteralExpression(singleValues.get(0), coltype));
+ filters.add(ex);
+ }
+ else if(singleValues.size() > 1) {
+ ListExpression candidates = null;
+ List<Expression> exs = singleValues.stream().map((a) ->
+ {
+ return new LiteralExpression(ConvertDataByType(a, type), coltype);
+ }).collect(Collectors.toList());
+ candidates = new ListExpression(exs);
+
+ if(candidates != null)
+ filters.add(new InExpression(colExpression, candidates));
+ }
+ else if(rangeFilter.size() > 0){
+ if(rangeFilter.size() > 1) {
+ Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
+ if(rangeFilter.size() > 2)
+ {
+ for(int i = 2; i< rangeFilter.size(); i++)
+ {
+ filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
+ }
+ }
+ }
+ else if(rangeFilter.size() == 1)//only have one value
+ filters.add(rangeFilter.get(0));
+ }
+ }
+
+ Expression finalFilters;
+ List<Expression> tmp = filters.build();
+ if(tmp.size() > 1) {
+ finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
+ if(tmp.size() > 2)
+ {
+ for(int i = 2; i< tmp.size(); i++)
+ {
+ finalFilters = new AndExpression(finalFilters, tmp.get(i));
+ }
+ }
+ }
+ else if(tmp.size() == 1)
+ finalFilters = tmp.get(0);
+ else//no filter
+ return null;
+
+ return finalFilters;
+ }
+
+ public static DataType Spi2CarbondataTypeMapper(Type colType)
+ {
+ if(colType == BooleanType.BOOLEAN)
+ return DataType.BOOLEAN;
+ else if(colType == SmallintType.SMALLINT)
+ return DataType.SHORT;
+ else if(colType == IntegerType.INTEGER)
+ return DataType.INT;
+ else if(colType == BigintType.BIGINT)
+ return DataType.LONG;
+ else if(colType == DoubleType.DOUBLE)
+ return DataType.DOUBLE;
+ else if(colType == DecimalType.createDecimalType())
+ return DataType.DECIMAL;
+ else if(colType == VarcharType.VARCHAR)
+ return DataType.STRING;
+ else if(colType == DateType.DATE)
+ return DataType.DATE;
+ else if(colType == TimestampType.TIMESTAMP)
+ return DataType.TIMESTAMP;
+ else
+ return DataType.STRING;
+ }
+
+
+ public Object ConvertDataByType(Object rawdata, Type type)
+ {
+ if(type.equals(IntegerType.INTEGER))
+ return new Integer((rawdata.toString()));
+ else if(type.equals(BigintType.BIGINT))
+ return (Long)rawdata;
+ else if(type.equals(VarcharType.VARCHAR))
+ return ((Slice)rawdata).toStringUtf8();
+ else if(type.equals(BooleanType.BOOLEAN))
+ return (Boolean)(rawdata);
+
+ return rawdata;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java
new file mode 100755
index 0000000..6b263e0
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableHandle.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.SchemaTableName;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Joiner;
+
+import java.util.Objects;
+
+import static java.util.Locale.ENGLISH;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataTableHandle
+ implements ConnectorTableHandle {
+
+ private final String connectorId;
+ private final SchemaTableName schemaTableName;
+
+ @JsonCreator
+ public CarbondataTableHandle(
+ @JsonProperty("connectorId") String connectorId,
+ @JsonProperty("schemaTableName") SchemaTableName schemaTableName)
+ {
+ this.connectorId = requireNonNull(connectorId.toLowerCase(ENGLISH), "connectorId is null");
+ this.schemaTableName = schemaTableName;
+ }
+
+ @JsonProperty
+ public String getConnectorId()
+ {
+ return connectorId;
+ }
+
+ @JsonProperty
+ public SchemaTableName getSchemaTableName()
+ {
+ return schemaTableName;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(connectorId, schemaTableName);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+
+ CarbondataTableHandle other = (CarbondataTableHandle) obj;
+ return Objects.equals(this.connectorId, other.connectorId) && this.schemaTableName.equals(other.getSchemaTableName());
+ }
+
+ @Override
+ public String toString()
+ {
+ return Joiner.on(":").join(connectorId, schemaTableName.toString());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java
new file mode 100755
index 0000000..01434bd
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTableLayoutHandle.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataTableLayoutHandle
+ implements ConnectorTableLayoutHandle
+{
+ private final CarbondataTableHandle table;
+ private final TupleDomain<ColumnHandle> constraint;
+
+ @JsonCreator
+ public CarbondataTableLayoutHandle(@JsonProperty("table") CarbondataTableHandle table,
+ @JsonProperty("constraint") TupleDomain<ColumnHandle> constraint)
+ {
+ this.table = requireNonNull(table, "table is null");
+ this.constraint = requireNonNull(constraint, "constraint is null");
+ }
+
+ @JsonProperty
+ public CarbondataTableHandle getTable()
+ {
+ return table;
+ }
+
+ @JsonProperty
+ public TupleDomain<ColumnHandle> getConstraint()
+ {
+ return constraint;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+
+ CarbondataTableLayoutHandle other = (CarbondataTableLayoutHandle) obj;
+ return Objects.equals(table, other.table)
+ && Objects.equals(constraint, other.constraint);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(table, constraint);
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("table", table)
+ .add("constraint", constraint)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java
new file mode 100755
index 0000000..a643a33
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataTransactionHandle.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+public enum CarbondataTransactionHandle
+ implements ConnectorTransactionHandle
+{
+ INSTANCE
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java
new file mode 100755
index 0000000..5212dad
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/Types.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import java.util.Locale;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class Types {
+ private Types() {}
+
+ public static <A, B extends A> B checkType(A value, Class<B> target, String name)
+ {
+ requireNonNull(value, String.format(Locale.ENGLISH, "%s is null", name));
+ checkArgument(target.isInstance(value),
+ "%s must be of type %s, not %s",
+ name,
+ target.getName(),
+ value.getClass().getName());
+ return target.cast(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java
new file mode 100755
index 0000000..6084022
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonLocalInputSplit.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata.impl;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+public class CarbonLocalInputSplit {
+
+ private static final long serialVersionUID = 3520344046772190207L;
+ private String segmentId;
+ private String path;
+ private long start;
+ private long length;
+ private List<String> locations;
+ private short version;
+ /**
+ * Number of BlockLets in a block
+ */
+ private int numberOfBlocklets = 0;
+
+
+ @JsonProperty
+ public short getVersion(){
+ return version;
+ }
+
+ @JsonProperty
+ public List<String> getLocations() {
+ return locations;
+ }
+
+ @JsonProperty
+ public long getLength() {
+ return length;
+ }
+
+ @JsonProperty
+ public long getStart() {
+ return start;
+ }
+
+ @JsonProperty
+ public String getPath() {
+ return path;
+ }
+
+ @JsonProperty
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ @JsonProperty
+ public int getNumberOfBlocklets() {
+ return numberOfBlocklets;
+ }
+
+ @JsonCreator
+ public CarbonLocalInputSplit(@JsonProperty("segmentId") String segmentId,
+ @JsonProperty("path") String path,
+ @JsonProperty("start") long start,
+ @JsonProperty("length") long length,
+ @JsonProperty("locations") List<String> locations,
+ @JsonProperty("numberOfBlocklets") int numberOfBlocklets/*,
+ @JsonProperty("tableBlockInfo") TableBlockInfo tableBlockInfo*/,
+ @JsonProperty("version") short version) {
+ this.path = path;
+ this.start = start;
+ this.length = length;
+ this.segmentId = segmentId;
+ this.locations = locations;
+ this.numberOfBlocklets = numberOfBlocklets;
+ //this.tableBlockInfo = tableBlockInfo;
+ this.version = version;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java
new file mode 100755
index 0000000..d47f2a5
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableCacheModel.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata.impl;
+
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+public class CarbonTableCacheModel {
+
+ public CarbonTableIdentifier carbonTableIdentifier;
+ public CarbonTablePath carbonTablePath;
+
+ public TableInfo tableInfo;
+ public CarbonTable carbonTable;
+ public String[] segments;
+
+ public boolean isValid()
+ {
+ if(carbonTable != null
+ && carbonTablePath != null
+ && carbonTableIdentifier != null)
+ return true;
+ else
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java
new file mode 100755
index 0000000..cd52b85
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata.impl;
+
+import io.airlift.configuration.Config;
+
+import javax.validation.constraints.NotNull;
+
+public class CarbonTableConfig {
+ //read from config
+ private String dbPtah;
+ private String tablePath;
+ private String storePath;
+
+ @NotNull
+ public String getDbPtah() {
+ return dbPtah;
+ }
+
+ @Config("carbondata-store")
+ public CarbonTableConfig setDbPtah(String dbPtah) {
+ this.dbPtah = dbPtah;
+ return this;
+ }
+
+ @NotNull
+ public String getTablePath() {
+ return tablePath;
+ }
+
+ @Config("carbondata-store")
+ public CarbonTableConfig setTablePath(String tablePath) {
+ this.tablePath = tablePath;
+ return this;
+ }
+
+ @NotNull
+ public String getStorePath() {
+ return storePath;
+ }
+
+ @Config("carbondata-store")
+ public CarbonTableConfig setStorePath(String storePath) {
+ this.storePath = storePath;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java
new file mode 100755
index 0000000..40bb841
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/impl/CarbonTableReader.java
@@ -0,0 +1,736 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.facebook.presto.carbondata.impl;
+
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Inject;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.*;
+import org.apache.carbondata.core.datastore.block.*;
+import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.impl.btree.BTreeDataRefNodeFinder;
+import org.apache.carbondata.core.datastore.impl.btree.BlockBTreeLeafNode;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.converter.SchemaConverter;
+import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.reader.ThriftReader;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
+import org.apache.carbondata.core.scan.filter.FilterUtil;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.service.impl.PathFactory;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CacheClient;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.thrift.TBase;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import static java.util.Objects.requireNonNull;
+
+public class CarbonTableReader {
+ //CarbonTableReader will be a facade of these utils
+ //[
+ // 1:CarbonMetadata,(logic table)
+ // 2:FileFactory, (physic table file)
+ // 3:CarbonCommonFactory, (offer some )
+ // 4:DictionaryFactory, (parse dictionary util)
+ //]
+
+ private CarbonTableConfig config;
+ private List<SchemaTableName> tableList;
+ private CarbonFile dbStore;
+ private FileFactory.FileType fileType;
+
+ private ConcurrentHashMap<SchemaTableName, CarbonTableCacheModel> cc;//as a cache for Carbon reader
+
+ @Inject
+ public CarbonTableReader(CarbonTableConfig config){
+ this.config = requireNonNull(config, "CarbonTableConfig is null");
+ this.cc = new ConcurrentHashMap<>();
+ }
+
+ public CarbonTableCacheModel getCarbonCache(SchemaTableName table){
+ if(!cc.containsKey(table))//for worker node to initalize carbon metastore
+ {
+ try(ThreadContextClassLoader ignored = new ThreadContextClassLoader(FileFactory.class.getClassLoader())) {
+ if(dbStore == null) {
+ fileType = FileFactory.getFileType(config.getStorePath());
+ try{
+ dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+ }catch (Exception ex){
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ updateSchemaTables();
+ parseCarbonMetadata(table);
+ }
+
+ if(cc.containsKey(table))
+ return cc.get(table);
+ else
+ return null;//need to reload?*/
+ }
+
+ public List<String> getSchemaNames() {
+ return updateSchemaList();
+ }
+
+ //default PathFilter
+ private static final PathFilter DefaultFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return CarbonTablePath.isCarbonDataFile(path.getName());
+ }
+ };
+
+ public boolean updateDbStore(){
+ if(dbStore == null) {
+ fileType = FileFactory.getFileType(config.getStorePath());
+ try{
+ dbStore = FileFactory.getCarbonFile(config.getStorePath(), fileType);
+ }catch (Exception ex){
+ throw new RuntimeException(ex);
+ }
+ }
+ return true;
+ }
+
+ public List<String> updateSchemaList() {
+ updateDbStore();
+
+ if(dbStore != null){
+ List<String> scs = Stream.of(dbStore.listFiles()).map(a -> a.getName()).collect(Collectors.toList());
+ return scs;
+ }
+ else
+ return ImmutableList.of();
+ }
+
+
+ public Set<String> getTableNames(String schema) {
+ requireNonNull(schema, "schema is null");
+ return updateTableList(schema);
+ }
+
+ public Set<String> updateTableList(String dbName){
+ List<CarbonFile> schema = Stream.of(dbStore.listFiles()).filter(a -> dbName.equals(a.getName())).collect(Collectors.toList());
+ if(schema.size() > 0)
+ {
+ return Stream.of((schema.get(0)).listFiles()).map(a -> a.getName()).collect(Collectors.toSet());
+ }
+ else
+ return ImmutableSet.of();
+ }
+
+ public CarbonTable getTable(SchemaTableName schemaTableName) {
+ try {
+ updateSchemaTables();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ requireNonNull(schemaTableName, "schemaTableName is null");
+ CarbonTable table = loadTableMetadata(schemaTableName);
+
+ return table;
+ }
+
+
+ public void updateSchemaTables()
+ {
+ //update logic determine later
+ if(dbStore == null)
+ {
+ updateSchemaList();
+ }
+
+ tableList = new LinkedList<>();
+ for(CarbonFile db: dbStore.listFiles())
+ {
+ if(!db.getName().endsWith(".mdt")) {
+ for (CarbonFile table : db.listFiles()) {
+ tableList.add(new SchemaTableName(db.getName(), table.getName()));
+ }
+ }
+ }
+ }
+
+ private CarbonTable loadTableMetadata(SchemaTableName schemaTableName)
+ {
+ for (SchemaTableName table : tableList) {
+ if (!table.equals(schemaTableName))
+ continue;
+
+ return parseCarbonMetadata(table);
+ }
+ return null;
+ }
+
+ /**
+ * parse carbon metadata into cc(CarbonTableReader cache)
+ **/
+ public CarbonTable parseCarbonMetadata(SchemaTableName table)
+ {
+ CarbonTable result = null;
+ try {
+ //\u8fd9\u4e2a\u5e94\u8be5\u653e\u5728StoreFactory
+ CarbonTableCacheModel cache = cc.getOrDefault(table, new CarbonTableCacheModel());
+ if(cache.isValid())
+ return cache.carbonTable;
+
+ //Step1: get table meta path, load carbon table param
+ String storePath = config.getStorePath();
+ cache.carbonTableIdentifier = new CarbonTableIdentifier(table.getSchemaName(), table.getTableName(), UUID.randomUUID().toString());
+ cache.carbonTablePath = PathFactory.getInstance().getCarbonTablePath(storePath, cache.carbonTableIdentifier);
+ cc.put(table, cache);
+
+ //Step2: check file existed? read schema file
+ ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
+ public TBase create() {
+ return new org.apache.carbondata.format.TableInfo();
+ }
+ };
+ ThriftReader thriftReader =
+ new ThriftReader(cache.carbonTablePath.getSchemaFilePath(), createTBase);
+ thriftReader.open();
+ org.apache.carbondata.format.TableInfo tableInfo =
+ (org.apache.carbondata.format.TableInfo) thriftReader.read();
+ thriftReader.close();
+
+ //Format Level\u7684TableInfo\uff0c \u9700\u8981\u8f6c\u6362\u6210Code Level\u7684TableInfo
+ SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
+ TableInfo wrapperTableInfo = schemaConverter
+ .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
+ storePath);
+ wrapperTableInfo.setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
+ //\u52a0\u8f7d\u5230CarbonMetadata\u4ed3\u5e93
+ CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
+
+ cache.tableInfo = wrapperTableInfo;
+ cache.carbonTable = CarbonMetadata.getInstance().getCarbonTable(cache.carbonTableIdentifier.getTableUniqueName());
+ result = cache.carbonTable;
+ }catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+
+ return result;
+ }
+
+ public List<CarbonLocalInputSplit> getInputSplits2(CarbonTableCacheModel tableCacheModel, Expression filters) throws Exception {
+
+ //\u5904\u7406filter, \u4e0b\u63a8filter\uff0c\u5c06\u5e94\u7528\u5728Segment\u7684\u7d22\u5f15\u4e0a
+ FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
+
+ AbsoluteTableIdentifier absoluteTableIdentifier = tableCacheModel.carbonTable.getAbsoluteTableIdentifier();
+ CacheClient cacheClient = new CacheClient(absoluteTableIdentifier.getStorePath());
+ List<String> invalidSegments = new ArrayList<>();
+ List<UpdateVO> invalidTimestampsList = new ArrayList<>();
+
+ // get all valid segments and set them into the configuration
+ SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(absoluteTableIdentifier);
+ SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+ SegmentStatusManager.ValidAndInvalidSegmentsInfo segments = segmentStatusManager.getValidAndInvalidSegments();
+
+ tableCacheModel.segments = segments.getValidSegments().toArray(new String[0]);
+ if (segments.getValidSegments().size() == 0) {
+ return new ArrayList<>(0);
+ }
+
+ // remove entry in the segment index if there are invalid segments
+ invalidSegments.addAll(segments.getInvalidSegments());
+ for (String invalidSegmentId : invalidSegments) {
+ invalidTimestampsList.add(updateStatusManager.getInvalidTimestampRange(invalidSegmentId));
+ }
+ if (invalidSegments.size() > 0) {
+ List<TableSegmentUniqueIdentifier> invalidSegmentsIds = new ArrayList<>(invalidSegments.size());
+ for(String segId: invalidSegments) {
+ invalidSegmentsIds.add(new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segId));
+ }
+ cacheClient.getSegmentAccessClient().invalidateAll(invalidSegmentsIds);
+ }
+
+ // get filter for segment
+ CarbonInputFormatUtil.processFilterExpression(filters, tableCacheModel.carbonTable);
+ FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filters, tableCacheModel.carbonTable.getAbsoluteTableIdentifier());
+
+ List<CarbonLocalInputSplit> result = new ArrayList<>();
+ //for each segment fetch blocks matching filter in Driver BTree
+ for (String segmentNo : tableCacheModel.segments) {
+ try{
+ List<DataRefNode> dataRefNodes = getDataBlocksOfSegment(filterExpressionProcessor, absoluteTableIdentifier,tableCacheModel.carbonTablePath, filterInterface, segmentNo, cacheClient, updateStatusManager);
+ for (DataRefNode dataRefNode : dataRefNodes) {
+ BlockBTreeLeafNode leafNode = (BlockBTreeLeafNode) dataRefNode;
+ TableBlockInfo tableBlockInfo = leafNode.getTableBlockInfo();
+
+ if (CarbonUtil.isInvalidTableBlock(tableBlockInfo, updateStatusManager.getInvalidTimestampRange(tableBlockInfo.getSegmentId()), updateStatusManager)) {
+ continue;
+ }
+ result.add(new CarbonLocalInputSplit(segmentNo, tableBlockInfo.getFilePath(),
+ tableBlockInfo.getBlockOffset(), tableBlockInfo.getBlockLength(),
+ Arrays.asList(tableBlockInfo.getLocations()), tableBlockInfo.getBlockletInfos().getNoOfBlockLets(),
+ tableBlockInfo.getVersion().number()));
+ }
+ }catch (Exception ex){
+ throw new RuntimeException(ex);
+ }
+ }
+ cacheClient.close();
+ return result;
+ }
+
+ /**
+ * get data blocks of given segment
+ */
+ private List<DataRefNode> getDataBlocksOfSegment(FilterExpressionProcessor filterExpressionProcessor,
+ AbsoluteTableIdentifier absoluteTableIdentifier,
+ CarbonTablePath tablePath,
+ FilterResolverIntf resolver,
+ String segmentId,
+ CacheClient cacheClient,
+ SegmentUpdateStatusManager updateStatusManager) throws IOException {
+ //DriverQueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.getQueryStatisticsRecorderInstance();
+ //QueryStatistic statistic = new QueryStatistic();
+
+ //\u8bfb\u53d6Segment \u5185\u90e8\u7684Index
+ Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap =
+ getSegmentAbstractIndexs(absoluteTableIdentifier, tablePath, segmentId, cacheClient,
+ updateStatusManager);
+
+ List<DataRefNode> resultFilterredBlocks = new LinkedList<DataRefNode>();
+
+ if (null != segmentIndexMap) {
+ // build result
+ for (AbstractIndex abstractIndex : segmentIndexMap.values()) {
+ List<DataRefNode> filterredBlocks;
+ // if no filter is given get all blocks from Btree Index
+ if (null == resolver) {
+ filterredBlocks = getDataBlocksOfIndex(abstractIndex);
+ } else {
+ //ignore filter
+ //filterredBlocks = getDataBlocksOfIndex(abstractIndex);
+
+ // apply filter and get matching blocks
+ filterredBlocks = filterExpressionProcessor
+ .getFilterredBlocks(abstractIndex.getDataRefNode(), resolver, abstractIndex,
+ absoluteTableIdentifier);
+ }
+ resultFilterredBlocks.addAll(filterredBlocks);
+ }
+ }
+ //statistic.addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+ //recorder.recordStatisticsForDriver(statistic, "123456"/*job.getConfiguration().get("query.id")*/);
+ return resultFilterredBlocks;
+ }
+
+ private boolean isSegmentUpdate(SegmentTaskIndexWrapper segmentTaskIndexWrapper,
+ UpdateVO updateDetails) {
+ if (null != updateDetails.getLatestUpdateTimestamp()
+ && updateDetails.getLatestUpdateTimestamp() > segmentTaskIndexWrapper
+ .getRefreshedTimeStamp()) {
+ return true;
+ }
+ return false;
+ }
+
+ private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(/*JobContext job,*/
+ AbsoluteTableIdentifier absoluteTableIdentifier,
+ CarbonTablePath tablePath,
+ String segmentId,
+ CacheClient cacheClient,
+ SegmentUpdateStatusManager updateStatusManager) throws IOException {
+ Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap = null;
+ SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
+ boolean isSegmentUpdated = false;
+ Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
+ TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
+ new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
+ segmentTaskIndexWrapper =
+ cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
+ UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
+ if (null != segmentTaskIndexWrapper) {
+ segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+ if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
+ taskKeys = segmentIndexMap.keySet();
+ isSegmentUpdated = true;
+ }
+ }
+
+ // if segment tree is not loaded, load the segment tree
+ if (segmentIndexMap == null || isSegmentUpdated) {
+
+ List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
+ List<String> segs = new ArrayList<>();
+ segs.add(segmentId);
+
+ FileSystem fs = getFileStatusOfSegments(new String[]{segmentId}, tablePath, fileStatusList);
+ List<InputSplit> splits = getSplit(fileStatusList, fs);
+
+ List<FileSplit> carbonSplits = new ArrayList<>();
+ for (InputSplit inputSplit : splits) {
+ FileSplit fileSplit = (FileSplit) inputSplit;
+ String segId = CarbonTablePath.DataPathUtil.getSegmentId(fileSplit.getPath().toString());//\u8fd9\u91cc\u7684seperator\u5e94\u8be5\u600e\u4e48\u52a0\uff1f\uff1f
+ if (segId.equals(CarbonCommonConstants.INVALID_SEGMENT_ID)) {
+ continue;
+ }
+ carbonSplits.add(fileSplit);
+ }
+
+ List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
+ for (FileSplit inputSplit : carbonSplits) {
+ if (isValidBlockBasedOnUpdateDetails(taskKeys, inputSplit, updateDetails, updateStatusManager, segmentId)) {
+
+ BlockletInfos blockletInfos = new BlockletInfos(0, 0, 0);//this level we do not need blocklet info!!!! Is this a trick?
+ tableBlockInfoList.add(
+ new TableBlockInfo(inputSplit.getPath().toString(),
+ inputSplit.getStart(),
+ segmentId,
+ inputSplit.getLocations(),
+ inputSplit.getLength(),
+ blockletInfos,
+ ColumnarFormatVersion.valueOf(CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION),
+ null/*new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE)*/));//\u8fd9\u91cc\u7684null\u662f\u5426\u4f1a\u5f02\u5e38\uff1f
+ }
+ }
+
+ Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
+ segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
+ // get Btree blocks for given segment
+ tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
+ tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
+ segmentTaskIndexWrapper =
+ cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
+ segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
+ }
+ return segmentIndexMap;
+ }
+
+ private boolean isValidBlockBasedOnUpdateDetails(
+ Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, FileSplit carbonInputSplit,
+ UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String segmentId) {
+ String taskID = null;
+ if (null != carbonInputSplit) {
+ if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName())) {
+ return false;
+ }
+
+ if (null == taskKeys) {
+ return true;
+ }
+
+ taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName());
+ String bucketNo =
+ CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName());
+
+ SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder =
+ new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo);
+
+ String blockTimestamp = carbonInputSplit.getPath().getName()
+ .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1,
+ carbonInputSplit.getPath().getName().lastIndexOf('.'));
+ if (!(updateDetails.getUpdateDeltaStartTimestamp() != null
+ && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp())) {
+ if (!taskKeys.contains(taskBucketHolder)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private List<InputSplit> getSplit(List<FileStatus> fileStatusList, FileSystem targetSystem) throws IOException {
+
+ Iterator split = fileStatusList.iterator();
+
+ List<InputSplit> splits = new ArrayList<>();
+
+ while (true)
+ {
+ while (true)
+ {
+ while(split.hasNext()) {
+ FileStatus file = (FileStatus) split.next();
+ Path path = file.getPath();
+ long length = file.getLen();
+ if (length != 0L) {
+ BlockLocation[] blkLocations;
+ if (file instanceof LocatedFileStatus) {
+ blkLocations = ((LocatedFileStatus) file).getBlockLocations();
+ } else {
+ blkLocations = targetSystem.getFileBlockLocations(file, 0L, length);
+ }
+
+ if (this.isSplitable()) {
+ long blockSize1 = file.getBlockSize();
+ long splitSize = this.computeSplitSize(blockSize1, 1, Long.MAX_VALUE);
+
+ long bytesRemaining;
+ int blkIndex;
+ for (bytesRemaining = length; (double) bytesRemaining / (double) splitSize > 1.1D; bytesRemaining -= splitSize) {
+ blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
+ splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts()));
+ }
+
+ if (bytesRemaining != 0L) {
+ blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
+ splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts()));
+ }
+ }
+ else
+ {
+ splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length, blkLocations[0].getHosts()));
+ }
+ }
+ else {
+ splits.add(new org.apache.hadoop.mapreduce.lib.input.FileSplit(path, 0L, length, new String[0]));
+ }
+ }
+ return splits;
+ }
+ }
+
+ }
+
+ private String[] getValidPartitions() {
+ //TODO: has to Identify partitions by partition pruning
+ return new String[] { "0" };
+ }
+
+ private FileSystem getFileStatusOfSegments(String[] segmentsToConsider,
+ CarbonTablePath tablePath,
+ List<FileStatus> result) throws IOException {
+ String[] partitionsToConsider = getValidPartitions();
+ if (partitionsToConsider.length == 0) {
+ throw new IOException("No partitions/data found");
+ }
+
+ FileSystem fs = null;
+
+ //PathFilter inputFilter = getDataFileFilter(job);
+
+ // get tokens for all the required FileSystem for table path
+ /*TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { tablePath },
+ job.getConfiguration());*/
+
+ //get all data files of valid partitions and segments
+ for (int i = 0; i < partitionsToConsider.length; ++i) {
+ String partition = partitionsToConsider[i];
+
+ for (int j = 0; j < segmentsToConsider.length; ++j) {
+ String segmentId = segmentsToConsider[j];
+ Path segmentPath = new Path(tablePath.getCarbonDataDirectoryPath(partition, segmentId));
+
+ try{
+ Configuration conf = new Configuration();
+ fs = segmentPath.getFileSystem(conf);
+ //fs.initialize(segmentPath.toUri(), conf);
+
+ RemoteIterator<LocatedFileStatus> iter = fs.listLocatedStatus(segmentPath);
+ while (iter.hasNext()) {
+ LocatedFileStatus stat = iter.next();
+ //if(stat.getPath().toString().contains("carbondata"))//\u53c2\u770bcarbondata\u7684carbonInputFilter\u7684\u5b9e\u73b0
+ if (DefaultFilter.accept(stat.getPath()))
+ {
+ if (stat.isDirectory()) {
+ addInputPathRecursively(result, fs, stat.getPath(), DefaultFilter);
+ } else {
+ result.add(stat);
+ }
+ }
+ }
+ }catch (Exception ex){
+ System.out.println(ex.toString());
+ }
+ }
+ }
+ return fs;
+ }
+
+ protected void addInputPathRecursively(List<FileStatus> result, FileSystem fs, Path path, PathFilter inputFilter) throws IOException {
+ RemoteIterator iter = fs.listLocatedStatus(path);
+
+ while(iter.hasNext()) {
+ LocatedFileStatus stat = (LocatedFileStatus)iter.next();
+ if(inputFilter.accept(stat.getPath())) {
+ if(stat.isDirectory()) {
+ this.addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+ } else {
+ result.add(stat);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * get data blocks of given btree
+ */
+ private List<DataRefNode> getDataBlocksOfIndex(AbstractIndex abstractIndex) {
+ List<DataRefNode> blocks = new LinkedList<DataRefNode>();
+ SegmentProperties segmentProperties = abstractIndex.getSegmentProperties();
+
+ try {
+ IndexKey startIndexKey = FilterUtil.prepareDefaultStartIndexKey(segmentProperties);
+ IndexKey endIndexKey = FilterUtil.prepareDefaultEndIndexKey(segmentProperties);
+
+ // Add all blocks of btree into result
+ DataRefNodeFinder blockFinder =
+ new BTreeDataRefNodeFinder(segmentProperties.getEachDimColumnValueSize());
+ DataRefNode startBlock =
+ blockFinder.findFirstDataBlock(abstractIndex.getDataRefNode(), startIndexKey);
+ DataRefNode endBlock =
+ blockFinder.findLastDataBlock(abstractIndex.getDataRefNode(), endIndexKey);
+ while (startBlock != endBlock) {
+ blocks.add(startBlock);
+ startBlock = startBlock.getNextDataRefNode();
+ }
+ blocks.add(endBlock);
+
+ } catch (KeyGenException e) {
+ System.out.println("Could not generate start key" + e.getMessage());
+ }
+ return blocks;
+ }
+
+ private boolean isSplitable() {
+ try {
+ // Don't split the file if it is local file system
+ if(this.fileType == FileFactory.FileType.LOCAL)
+ {
+ return false;
+ }
+ } catch (Exception e) {
+ return true;
+ }
+ return true;
+ }
+
+ private long computeSplitSize(long blockSize, long minSize,
+ long maxSize) {
+ return Math.max(minSize, Math.min(maxSize, blockSize));
+ }
+
+ private FileSplit makeSplit(Path file, long start, long length,
+ String[] hosts) {
+ return new FileSplit(file, start, length, hosts);
+ }
+
+ private int getBlockIndex(BlockLocation[] blkLocations,
+ long offset) {
+ for (int i = 0 ; i < blkLocations.length; i++) {
+ // is the offset inside this block?
+ if ((blkLocations[i].getOffset() <= offset) &&
+ (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())){
+ return i;
+ }
+ }
+ BlockLocation last = blkLocations[blkLocations.length -1];
+ long fileLength = last.getOffset() + last.getLength() -1;
+ throw new IllegalArgumentException("Offset " + offset +
+ " is outside of file (0.." +
+ fileLength + ")");
+ }
+
+
+ /**
+ * get total number of rows. for count(*)
+ *
+ * @throws IOException
+ * @throws IndexBuilderException
+ */
+ public long getRowCount() throws IOException, IndexBuilderException {
+ long rowCount = 0;
+ /*AbsoluteTableIdentifier absoluteTableIdentifier = this.carbonTable.getAbsoluteTableIdentifier();
+
+ // no of core to load the blocks in driver
+ //addSegmentsIfEmpty(job, absoluteTableIdentifier);
+ int numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
+ try {
+ numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT));
+ } catch (NumberFormatException e) {
+ numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE;
+ }
+ // creating a thread pool
+ ExecutorService threadPool = Executors.newFixedThreadPool(numberOfCores);
+ List<Future<Map<String, AbstractIndex>>> loadedBlocks =
+ new ArrayList<Future<Map<String, AbstractIndex>>>();
+ //for each segment fetch blocks matching filter in Driver BTree
+ for (String segmentNo : this.segmentList) {
+ // submitting the task
+ loadedBlocks
+ .add(threadPool.submit(new BlocksLoaderThread(*//*job,*//* absoluteTableIdentifier, segmentNo)));
+ }
+ threadPool.shutdown();
+ try {
+ threadPool.awaitTermination(1, TimeUnit.HOURS);
+ } catch (InterruptedException e) {
+ throw new IndexBuilderException(e);
+ }
+ try {
+ // adding all the rows of the blocks to get the total row
+ // count
+ for (Future<Map<String, AbstractIndex>> block : loadedBlocks) {
+ for (AbstractIndex abstractIndex : block.get().values()) {
+ rowCount += abstractIndex.getTotalNumberOfRows();
+ }
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ throw new IndexBuilderException(e);
+ }*/
+ return rowCount;
+ }
+}
[2/3] incubator-carbondata git commit: add presto integration 0.0.1
Posted by ch...@apache.org.
add presto integration 0.0.1
update pom & annotation
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/56720871
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/56720871
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/56720871
Branch: refs/heads/master
Commit: 56720871a70d0bd3947e54cd5f9353b510e9fb82
Parents: e441ab0
Author: ffpeng90 <ff...@126.com>
Authored: Sun Mar 12 20:27:32 2017 +0800
Committer: chenliang613 <ch...@huawei.com>
Committed: Thu Mar 23 10:41:51 2017 +0530
----------------------------------------------------------------------
integration/presto/pom.xml | 167 +++++
integration/presto/src/checkstyle/checks.xml | 7 +
.../presto/src/license/LICENSE-HEADER.txt | 11 +
.../carbondata/CarbondataColumnConstraint.java | 99 +++
.../carbondata/CarbondataColumnHandle.java | 159 ++++
.../presto/carbondata/CarbondataConnector.java | 84 +++
.../carbondata/CarbondataConnectorFactory.java | 95 +++
.../carbondata/CarbondataConnectorId.java | 57 ++
.../carbondata/CarbondataHandleResolver.java | 45 ++
.../presto/carbondata/CarbondataMetadata.java | 305 ++++++++
.../presto/carbondata/CarbondataModule.java | 79 ++
.../presto/carbondata/CarbondataPlugin.java | 32 +
.../carbondata/CarbondataRecordCursor.java | 152 ++++
.../presto/carbondata/CarbondataRecordSet.java | 111 +++
.../carbondata/CarbondataRecordSetProvider.java | 292 ++++++++
.../presto/carbondata/CarbondataSplit.java | 92 +++
.../carbondata/CarbondataSplitManager.java | 300 ++++++++
.../carbondata/CarbondataTableHandle.java | 80 ++
.../carbondata/CarbondataTableLayoutHandle.java | 83 +++
.../carbondata/CarbondataTransactionHandle.java | 22 +
.../com/facebook/presto/carbondata/Types.java | 34 +
.../carbondata/impl/CarbonLocalInputSplit.java | 89 +++
.../carbondata/impl/CarbonTableCacheModel.java | 40 +
.../carbondata/impl/CarbonTableConfig.java | 58 ++
.../carbondata/impl/CarbonTableReader.java | 736 +++++++++++++++++++
25 files changed, 3229 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/pom.xml
----------------------------------------------------------------------
diff --git a/integration/presto/pom.xml b/integration/presto/pom.xml
new file mode 100644
index 0000000..b9a7e34
--- /dev/null
+++ b/integration/presto/pom.xml
@@ -0,0 +1,167 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>com.facebook.presto</groupId>
+ <artifactId>presto-root</artifactId>
+ <version>0.153</version>
+ </parent>
+
+ <groupId>com.facebook.presto</groupId>
+ <artifactId>presto-carbondata</artifactId>
+ <packaging>presto-plugin</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.facebook.presto</groupId>
+ <artifactId>presto-spi</artifactId>
+ <version>0.153</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.9.3</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-core</artifactId>
+ <version>1.0.0-incubating</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-common</artifactId>
+ <version>1.0.0-incubating</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-hadoop</artifactId>
+ <version>1.0.0-incubating</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>bootstrap</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>json</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>log</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ </dependency>
+
+ <!--presto intergated-->
+
+ <dependency>
+ <groupId>com.facebook.presto</groupId>
+ <artifactId>presto-spi</artifactId>
+ <version>0.153</version>
+ <!--<scope>provided</scope>-->
+ </dependency>
+
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>slice</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>io.airlift</groupId>
+ <artifactId>units</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.facebook.presto.hadoop</groupId>
+ <artifactId>hadoop-apache2</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-enforcer-plugin</artifactId>
+ <version>1.4.1</version>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>com.ning.maven.plugins</groupId>
+ <artifactId>maven-dependency-versions-check-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ <failBuildInCaseOfConflict>false</failBuildInCaseOfConflict>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <configuration>
+ <skip>false</skip>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>com.ning.maven.plugins</groupId>
+ <artifactId>maven-duplicate-finder-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>pl.project13.maven</groupId>
+ <artifactId>git-commit-id-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/checkstyle/checks.xml
----------------------------------------------------------------------
diff --git a/integration/presto/src/checkstyle/checks.xml b/integration/presto/src/checkstyle/checks.xml
new file mode 100755
index 0000000..fb6f41d
--- /dev/null
+++ b/integration/presto/src/checkstyle/checks.xml
@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE module PUBLIC
+ "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+ "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+<module name="Checker">
+
+</module>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/license/LICENSE-HEADER.txt
----------------------------------------------------------------------
diff --git a/integration/presto/src/license/LICENSE-HEADER.txt b/integration/presto/src/license/LICENSE-HEADER.txt
new file mode 100755
index 0000000..d955a86
--- /dev/null
+++ b/integration/presto/src/license/LICENSE-HEADER.txt
@@ -0,0 +1,11 @@
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataColumnConstraint.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataColumnConstraint.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataColumnConstraint.java
new file mode 100755
index 0000000..29d2076
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataColumnConstraint.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.predicate.Domain;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSetter;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataColumnConstraint {
+ private final String name;
+ private final boolean invertedindexed;
+ private Optional<Domain> domain;
+
+ @JsonCreator
+ public CarbondataColumnConstraint(
+ @JsonProperty("name") String name,
+ @JsonProperty("domain") Optional<Domain> domain,
+ @JsonProperty("invertedindexed") boolean invertedindexed)
+ {
+ this.name = requireNonNull(name, "name is null");
+ this.invertedindexed = requireNonNull(invertedindexed, "invertedIndexed is null");
+ this.domain = requireNonNull(domain, "domain is null");
+ }
+
+ @JsonProperty
+ public boolean isInvertedindexed()
+ {
+ return invertedindexed;
+ }
+
+ @JsonProperty
+ public String getName()
+ {
+ return name;
+ }
+
+ @JsonProperty
+ public Optional<Domain> getDomain()
+ {
+ return domain;
+ }
+
+ @JsonSetter
+ public void setDomain(Optional<Domain> domain)
+ {
+ this.domain = domain;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(name, domain, invertedindexed);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+
+ CarbondataColumnConstraint other = (CarbondataColumnConstraint) obj;
+ return Objects.equals(this.name, other.name)
+ && Objects.equals(this.domain, other.domain)
+ && Objects.equals(this.invertedindexed, other.invertedindexed);
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("name", this.name)
+ .add("invertedindexed", this.invertedindexed)
+ .add("domain", this.domain)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataColumnHandle.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataColumnHandle.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataColumnHandle.java
new file mode 100755
index 0000000..49d140a
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataColumnHandle.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.type.Type;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataColumnHandle
+ implements ColumnHandle
+{
+ private final String connectorId;
+ private final String columnName;
+
+ public boolean isInvertedIndex() {
+ return isInvertedIndex;
+ }
+
+ private final Type columnType;
+ private final int ordinalPosition;
+ private final int keyOrdinal;
+ private final int columnGroupOrdinal;
+
+ private final int columnGroupId;
+ private final String columnUniqueId;
+ private final boolean isInvertedIndex;
+
+ public boolean isMeasure() {
+ return isMeasure;
+ }
+
+ private final boolean isMeasure;
+
+ public int getKeyOrdinal() {
+ return keyOrdinal;
+ }
+
+ public int getColumnGroupOrdinal() {
+ return columnGroupOrdinal;
+ }
+
+ public int getColumnGroupId() {
+ return columnGroupId;
+ }
+
+ public String getColumnUniqueId() {
+ return columnUniqueId;
+ }
+ /* ordinalPosition of a columnhandle is the -> number of the column in the entire list of columns of this table
+ IT DOESNT DEPEND ON THE QUERY (select clm3, clm0, clm1 from tablename)
+ The columnhandle of clm3 : has ordinalposition = 3
+ */
+
+ @JsonCreator
+ public CarbondataColumnHandle(
+ @JsonProperty("connectorId") String connectorId,
+ @JsonProperty("columnName") String columnName,
+ @JsonProperty("columnType") Type columnType,
+ @JsonProperty("ordinalPosition") int ordinalPosition,
+ @JsonProperty("keyOrdinal") int keyOrdinal,
+ @JsonProperty("columnGroupOrdinal") int columnGroupOrdinal,
+ @JsonProperty("isMeasure") boolean isMeasure,
+ @JsonProperty("columnGroupId") int columnGroupId,
+ @JsonProperty("columnUniqueId") String columnUniqueId,
+ @JsonProperty("isInvertedIndex") boolean isInvertedIndex)
+ {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null");
+ this.columnName = requireNonNull(columnName, "columnName is null");
+ this.columnType = requireNonNull(columnType, "columnType is null");
+
+ this.ordinalPosition = requireNonNull(ordinalPosition, "ordinalPosition is null");
+ this.keyOrdinal = requireNonNull(keyOrdinal, "keyOrdinal is null");
+ this.columnGroupOrdinal = requireNonNull(columnGroupOrdinal, "columnGroupOrdinal is null");
+
+ this.isMeasure = isMeasure;
+ this.columnGroupId = requireNonNull(columnGroupId, "columnGroupId is null");
+ this.columnUniqueId = columnUniqueId;//requireNonNull(columnUniqueId, "columnUniqueId is null");
+ this.isInvertedIndex = requireNonNull(isInvertedIndex, "isInvertedIndex is null");
+ }
+
+ @JsonProperty
+ public String getConnectorId()
+ {
+ return connectorId;
+ }
+
+ @JsonProperty
+ public String getColumnName()
+ {
+ return columnName;
+ }
+
+ @JsonProperty
+ public Type getColumnType()
+ {
+ return columnType;
+ }
+
+ @JsonProperty
+ public int getOrdinalPosition()
+ {
+ return ordinalPosition;
+ }
+
+ public ColumnMetadata getColumnMetadata()
+ {
+ return new ColumnMetadata(columnName, columnType, null, false);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(connectorId, columnName);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+
+ CarbondataColumnHandle other = (CarbondataColumnHandle) obj;
+ return Objects.equals(this.connectorId, other.connectorId) &&
+ Objects.equals(this.columnName, other.columnName);
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("connectorId", connectorId)
+ .add("columnName", columnName)
+ .add("columnType", columnType)
+ .add("ordinalPosition", ordinalPosition)
+ .toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnector.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnector.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnector.java
new file mode 100755
index 0000000..7d8fdee
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnector.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.transaction.IsolationLevel;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.log.Logger;
+
+import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
+import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataConnector
+ implements Connector
+{
+
+ private static final Logger log = Logger.get(CarbondataConnector.class);
+
+ private final LifeCycleManager lifeCycleManager;
+ private final CarbondataMetadata metadata;
+ private final ConnectorSplitManager splitManager;
+ private final ConnectorRecordSetProvider recordSetProvider;
+ private final ClassLoader classLoader;
+
+
+ public CarbondataConnector(LifeCycleManager lifeCycleManager,
+ CarbondataMetadata metadata,
+ ConnectorSplitManager splitManager,
+ ConnectorRecordSetProvider recordSetProvider,
+ ClassLoader classLoader)
+ {
+ this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+ this.metadata = requireNonNull(metadata, "metadata is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+ this.classLoader = requireNonNull(classLoader, "classLoader is null");
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) {
+ checkConnectorSupports(READ_COMMITTED, isolationLevel);
+ return CarbondataTransactionHandle.INSTANCE;
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) {
+ metadata.putClassLoader(classLoader);
+ return metadata;
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager() {
+ return splitManager;
+ }
+
+ @Override
+ public ConnectorRecordSetProvider getRecordSetProvider()
+ {
+ return recordSetProvider;
+ }
+
+ @Override
+ public final void shutdown()
+ {
+ try {
+ lifeCycleManager.stop();
+ }
+ catch (Exception e) {
+ log.error(e, "Error shutting down connector");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnectorFactory.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnectorFactory.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnectorFactory.java
new file mode 100755
index 0000000..09a3717
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnectorFactory.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.connector.*;
+import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorSplitManager;
+import com.google.common.base.Throwables;
+import com.google.inject.Injector;
+import io.airlift.bootstrap.Bootstrap;
+import io.airlift.bootstrap.LifeCycleManager;
+import io.airlift.json.JsonModule;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataConnectorFactory
+ implements ConnectorFactory {
+
+ private final String name;
+ private final ClassLoader classLoader;
+
+ public CarbondataConnectorFactory(String connectorName, ClassLoader classLoader){
+ this.name = connectorName;
+ this.classLoader = requireNonNull(classLoader, "classLoader is null");
+ }
+
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public ConnectorHandleResolver getHandleResolver() {
+ return new CarbondataHandleResolver();
+ }
+
+ @Override
+ public Connector create(String connectorId, Map<String, String> config, ConnectorContext context) {
+ requireNonNull(config, "config is null");
+
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+ Bootstrap app = new Bootstrap(new JsonModule(), new CarbondataModule(connectorId, context.getTypeManager()));
+
+ Injector injector = app
+ .strictConfig()
+ .doNotInitializeLogging()
+ .setRequiredConfigurationProperties(config)
+ .initialize();
+
+ LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
+ CarbondataMetadata metadata = injector.getInstance(CarbondataMetadata.class);
+ //HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
+ ConnectorSplitManager splitManager = injector.getInstance(ConnectorSplitManager.class);
+ ConnectorRecordSetProvider connectorRecordSet = injector.getInstance(ConnectorRecordSetProvider.class);
+ //ConnectorAccessControl accessControl = injector.getInstance(ConnectorAccessControl.class);
+
+
+ //ConnectorPageSourceProvider connectorPageSource = injector.getInstance(ConnectorPageSourceProvider.class);
+ //ConnectorPageSinkProvider pageSinkProvider = injector.getInstance(ConnectorPageSinkProvider.class);
+ //ConnectorNodePartitioningProvider connectorDistributionProvider = injector.getInstance(ConnectorNodePartitioningProvider.class);
+ //HiveSessionProperties hiveSessionProperties = injector.getInstance(HiveSessionProperties.class);
+ //HiveTableProperties hiveTableProperties = injector.getInstance(HiveTableProperties.class);
+
+
+ return new CarbondataConnector(
+ lifeCycleManager,
+ metadata,
+ new ClassLoaderSafeConnectorSplitManager(splitManager, classLoader),
+ connectorRecordSet,//new ClassLoaderSafeConnectorRecordSetProvider(, classLoader),
+ classLoader
+ //new ClassLoaderSafeConnectorPageSourceProvider(connectorPageSource, classLoader),
+ //new ClassLoaderSafeConnectorPageSinkProvider(pageSinkProvider, classLoader),
+ //new ClassLoaderSafeNodePartitioningProvider(connectorDistributionProvider, classLoader),
+ );
+ }
+ catch (Exception e) {
+ throw Throwables.propagate(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnectorId.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnectorId.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnectorId.java
new file mode 100755
index 0000000..09289e3
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataConnectorId.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.google.inject.Inject;
+
+import java.util.Objects;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataConnectorId
+{
+ private final String id;
+
+ @Inject
+ public CarbondataConnectorId(String id)
+ {
+ this.id = requireNonNull(id, "id is null");
+ }
+
+ @Override
+ public String toString()
+ {
+ return id;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(id);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+
+ if ((obj == null) || (getClass() != obj.getClass())) {
+ return false;
+ }
+
+ return Objects.equals(this.id, ((CarbondataConnectorId) obj).id);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataHandleResolver.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataHandleResolver.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataHandleResolver.java
new file mode 100755
index 0000000..729d9e5
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataHandleResolver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+public class CarbondataHandleResolver implements ConnectorHandleResolver {
+ @Override
+ public Class<? extends ConnectorTableHandle> getTableHandleClass() {
+ return CarbondataTableHandle.class;
+ }
+
+ @Override
+ public Class<? extends ConnectorTableLayoutHandle> getTableLayoutHandleClass() {
+ return CarbondataTableLayoutHandle.class;
+ }
+
+ @Override
+ public Class<? extends ColumnHandle> getColumnHandleClass() {
+ return CarbondataColumnHandle.class;
+ }
+
+ @Override
+ public Class<? extends ConnectorSplit> getSplitClass() {
+ return CarbondataSplit.class;
+ }
+
+ @Override
+ public Class<? extends ConnectorTransactionHandle> getTransactionHandleClass()
+ {
+ return CarbondataTransactionHandle.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataMetadata.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataMetadata.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataMetadata.java
new file mode 100755
index 0000000..2866149
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataMetadata.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.carbondata.impl.CarbonTableReader;
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.type.*;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+
+import javax.inject.Inject;
+import java.util.*;
+
+import static com.facebook.presto.carbondata.Types.checkType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataMetadata
+ implements ConnectorMetadata
+{
+ private final String connectorId;
+ private CarbonTableReader carbonTableReader;
+ private ClassLoader classLoader;
+
+ private Map<String, ColumnHandle> columnHandleMap;
+
+ @Inject
+ public CarbondataMetadata(CarbondataConnectorId connectorId, CarbonTableReader reader)
+ {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+ this.carbonTableReader = requireNonNull(reader, "client is null");
+ }
+
+
+ public void putClassLoader(ClassLoader classLoader)
+ {
+ this.classLoader = classLoader;
+ }
+
+
+ @Override
+ public List<String> listSchemaNames(ConnectorSession session) {
+ return listSchemaNamesInternal();
+ }
+
+
+ public List<String> listSchemaNamesInternal()
+ {
+ List<String> ret;
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(classLoader)) {
+ ret = carbonTableReader.getSchemaNames();
+ }
+ return ret;
+ }
+
+ @Override
+ public List<SchemaTableName> listTables(ConnectorSession session, String schemaNameOrNull) {
+
+ /*List<SchemaTableName> all = carbonTableReader.getTableList();
+ if(schemaNameOrNull != null)
+ {
+ return all.stream().filter(a -> schemaNameOrNull.equals(a.getSchemaName())).collect(Collectors.toList());
+ }
+ return all;*/
+
+ List<String> schemaNames;
+ if (schemaNameOrNull != null) {
+ schemaNames = ImmutableList.of(schemaNameOrNull);
+ }
+ else {
+ schemaNames = carbonTableReader.getSchemaNames();
+ }
+
+ ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
+ for (String schemaName : schemaNames) {
+ for (String tableName : carbonTableReader.getTableNames(schemaName)) {
+ builder.add(new SchemaTableName(schemaName, tableName));
+ }
+ }
+ return builder.build();
+ }
+
+ @Override
+ public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) {
+ requireNonNull(prefix, "SchemaTablePrefix is null");
+
+ ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
+ for (SchemaTableName tableName : listTables(session, prefix)) {
+ ConnectorTableMetadata tableMetadata = getTableMetadata(tableName);
+ if (tableMetadata != null) {
+ columns.put(tableName, tableMetadata.getColumns());
+ }
+ }
+ return columns.build();
+ }
+
+ //if prefix is null. return all tables
+ //if prefix is not null, just return this table
+ private List<SchemaTableName> listTables(ConnectorSession session, SchemaTablePrefix prefix)
+ {
+ if (prefix.getSchemaName() == null) {
+ return listTables(session, prefix.getSchemaName());
+ }
+ return ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName()));
+ }
+
+ private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName)
+ {
+ if (!listSchemaNamesInternal().contains(tableName.getSchemaName())) {
+ return null;
+ }
+
+ CarbonTable cb = carbonTableReader.getTable(tableName);
+ if (cb == null) {
+ return null;
+ }
+
+ List<ColumnMetadata> spiCols = new LinkedList<>();
+ List<CarbonDimension> cols = cb.getDimensionByTableName(tableName.getTableName());
+ for(CarbonDimension col : cols)
+ {
+ //show columns command will return these data
+ Type spiType = CarbondataType2SpiMapper(col.getColumnSchema().getDataType());
+ ColumnMetadata spiCol = new ColumnMetadata(col.getColumnSchema().getColumnName(), spiType);
+ spiCols.add(spiCol);
+ }
+
+ List<CarbonMeasure> mcols = cb.getMeasureByTableName(tableName.getTableName());
+ for(CarbonMeasure mcol : mcols)
+ {
+ Type spiType = CarbondataType2SpiMapper(mcol.getColumnSchema().getDataType());
+ ColumnMetadata spiCol = new ColumnMetadata(mcol.getColumnSchema().getColumnName(), spiType);
+ spiCols.add(spiCol);
+ }
+
+ //\u5c01\u88c5carbonTable
+ return new ConnectorTableMetadata(tableName, spiCols);
+ }
+
+ @Override
+ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) {
+
+ CarbondataTableHandle handle = checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
+ checkArgument(handle.getConnectorId().equals(connectorId), "tableHandle is not for this connector");
+
+ String schemaName = handle.getSchemaTableName().getSchemaName();
+ if (!listSchemaNamesInternal().contains(schemaName)) {
+ throw new SchemaNotFoundException(schemaName);
+ }
+
+ //CarbonTable(official struct) is stored in CarbonMetadata(official struct)
+ CarbonTable cb = carbonTableReader.getTable(handle.getSchemaTableName());
+ if (cb == null) {
+ throw new TableNotFoundException(handle.getSchemaTableName());
+ }
+
+ ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
+ int index = 0;
+ String tableName = handle.getSchemaTableName().getTableName();
+ for (CarbonDimension column : cb.getDimensionByTableName(tableName)) {
+ ColumnSchema cs = column.getColumnSchema();
+
+ int complex = column.getComplexTypeOrdinal();
+ column.getNumberOfChild();
+ column.getListOfChildDimensions();
+
+ Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+ columnHandles.put(
+ cs.getColumnName(),
+ new CarbondataColumnHandle(
+ connectorId,
+ cs.getColumnName(),
+ spiType,
+ index,
+ column.getKeyOrdinal(),
+ column.getColumnGroupOrdinal(),
+ false,
+ cs.getColumnGroupId(),
+ cs.getColumnUniqueId(),
+ cs.isUseInvertedIndex()));
+ index++;
+ }
+
+ for(CarbonMeasure measure : cb.getMeasureByTableName(tableName)){
+ ColumnSchema cs = measure.getColumnSchema();
+
+ Type spiType = CarbondataType2SpiMapper(cs.getDataType());
+ columnHandles.put(
+ cs.getColumnName(),
+ new CarbondataColumnHandle(
+ connectorId,
+ cs.getColumnName(),
+ spiType,
+ index,
+ measure.getOrdinal(),
+ cs.getColumnGroupId(),
+ true,
+ cs.getColumnGroupId(),
+ cs.getColumnUniqueId(),
+ cs.isUseInvertedIndex()));
+ index++;
+ }
+
+ //should i cache it?
+ columnHandleMap = columnHandles.build();
+
+ return columnHandleMap;
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) {
+
+ checkType(tableHandle, CarbondataTableHandle.class, "tableHandle");
+ return checkType(columnHandle, CarbondataColumnHandle.class, "columnHandle").getColumnMetadata();
+ }
+
+ @Override
+ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
+ //check tablename is valid
+ //schema is exist
+ //tables is exist
+
+ //CarbondataTable get from jar
+ return new CarbondataTableHandle(connectorId, tableName);
+ }
+
+ @Override
+ public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, Constraint<ColumnHandle> constraint, Optional<Set<ColumnHandle>> desiredColumns) {
+ CarbondataTableHandle handle = checkType(table, CarbondataTableHandle.class, "table");
+ ConnectorTableLayout layout = new ConnectorTableLayout(new CarbondataTableLayoutHandle(handle,constraint.getSummary()/*, constraint.getPredicateMap(),constraint.getFilterTuples()*/));
+ return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
+ }
+
+ @Override
+ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) {
+ return new ConnectorTableLayout(handle);
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) {
+ return getTableMetadataInternal(table);
+ }
+
+ public ConnectorTableMetadata getTableMetadataInternal(ConnectorTableHandle table)
+ {
+ CarbondataTableHandle carbondataTableHandle = checkType(table, CarbondataTableHandle.class, "table");
+ checkArgument(carbondataTableHandle.getConnectorId().equals(connectorId), "tableHandle is not for this connector");
+ return getTableMetadata(carbondataTableHandle.getSchemaTableName());
+ }
+
+
+ public static Type CarbondataType2SpiMapper(DataType colType)
+ {
+ switch (colType)
+ {
+ case BOOLEAN:
+ return BooleanType.BOOLEAN;
+ case SHORT:
+ return SmallintType.SMALLINT;
+ case INT:
+ return IntegerType.INTEGER;
+ case LONG:
+ return BigintType.BIGINT;
+ case FLOAT:
+ case DOUBLE:
+ return DoubleType.DOUBLE;
+
+ case DECIMAL:
+ return DecimalType.createDecimalType();
+ case STRING:
+ return VarcharType.VARCHAR;
+ case DATE:
+ return DateType.DATE;
+ case TIMESTAMP:
+ return TimestampType.TIMESTAMP;
+
+ /*case DataType.MAP:
+ case DataType.ARRAY:
+ case DataType.STRUCT:
+ case DataType.NULL:*/
+
+ default:
+ return VarcharType.VARCHAR;
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataModule.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataModule.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataModule.java
new file mode 100755
index 0000000..af62a75
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataModule.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.carbondata.impl.CarbonTableConfig;
+import com.facebook.presto.carbondata.impl.CarbonTableReader;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.type.Type;
+import com.facebook.presto.spi.type.TypeManager;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+import javax.inject.Inject;
+
+import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
+import static com.google.common.base.Preconditions.checkArgument;
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataModule implements Module {
+
+ private final String connectorId;
+ private final TypeManager typeManager;
+
+ public CarbondataModule(String connectorId, TypeManager typeManager)
+ {
+ this.connectorId = requireNonNull(connectorId, "connector id is null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ }
+
+ @Override
+ public void configure(Binder binder) {
+ binder.bind(TypeManager.class).toInstance(typeManager);
+
+ binder.bind(CarbondataConnectorId.class).toInstance(new CarbondataConnectorId(connectorId));
+ binder.bind(CarbondataMetadata.class).in(Scopes.SINGLETON);
+ binder.bind(CarbonTableReader.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorSplitManager.class).to(CarbondataSplitManager.class).in(Scopes.SINGLETON);
+ binder.bind(ConnectorRecordSetProvider.class).to(CarbondataRecordSetProvider.class).in(Scopes.SINGLETON);
+ binder.bind(CarbondataHandleResolver.class).in(Scopes.SINGLETON);
+ configBinder(binder).bindConfig(CarbonTableConfig.class);
+ }
+
+ public static final class TypeDeserializer
+ extends FromStringDeserializer<Type>
+ {
+ private final TypeManager typeManager;
+
+ @Inject
+ public TypeDeserializer(TypeManager typeManager)
+ {
+ super(Type.class);
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ }
+
+ @Override
+ protected Type _deserialize(String value, DeserializationContext context)
+ {
+ Type type = typeManager.getType(parseTypeSignature(value));
+ checkArgument(type != null, "Unknown type %s", value);
+ return type;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataPlugin.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataPlugin.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataPlugin.java
new file mode 100755
index 0000000..bd6a156
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataPlugin.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.Plugin;
+import com.facebook.presto.spi.connector.ConnectorFactory;
+import com.google.common.collect.ImmutableList;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+public class CarbondataPlugin implements Plugin {
+
+ @Override
+ public Iterable<ConnectorFactory> getConnectorFactories()
+ {
+ return ImmutableList.of(new CarbondataConnectorFactory("carbondata", getClassLoader()));
+ }
+
+ private static ClassLoader getClassLoader() {
+ return FileFactory.class.getClassLoader();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordCursor.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordCursor.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordCursor.java
new file mode 100755
index 0000000..43ca876
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordCursor.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.RecordCursor;
+import com.facebook.presto.spi.type.Type;
+import com.google.common.base.Strings;
+import io.airlift.log.Logger;
+import io.airlift.slice.Slice;
+import io.airlift.slice.Slices;
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
+import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
+import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+public class CarbondataRecordCursor implements RecordCursor {
+
+ private static final Logger log = Logger.get(CarbondataRecordCursor.class);
+ private final List<CarbondataColumnHandle> columnHandles;
+
+ private List<String> fields;
+ private CarbondataSplit split;
+ private CarbonIterator<Object[]> rowCursor;
+ private CarbonReadSupport<Object[]> readSupport;
+
+ private long totalBytes;
+ private long nanoStart;
+ private long nanoEnd;
+
+ public CarbondataRecordCursor(CarbonReadSupport<Object[]> readSupport, CarbonIterator<Object[]> carbonIterator, List<CarbondataColumnHandle> columnHandles, CarbondataSplit split) {
+ this.rowCursor = carbonIterator;
+ this.columnHandles = columnHandles;
+ this.readSupport = readSupport;
+ this.totalBytes = 0;
+ }
+
+
+ @Override
+ public long getTotalBytes() {
+ return totalBytes;
+ }
+
+ @Override
+ public long getCompletedBytes() {
+ return totalBytes;
+ }
+
+ @Override
+ public long getReadTimeNanos() {
+ return nanoStart > 0L ? (nanoEnd == 0 ? System.nanoTime() : nanoEnd) - nanoStart : 0L;
+ }
+
+ @Override
+ public Type getType(int field) {
+
+ checkArgument(field < columnHandles.size(), "Invalid field index");
+ return columnHandles.get(field).getColumnType();
+ }
+
+ @Override
+ public boolean advanceNextPosition() {
+
+ if (nanoStart == 0) {
+ nanoStart = System.nanoTime();
+ }
+
+ if(rowCursor.hasNext())
+ {
+ fields = Stream.of(readSupport.readRow(rowCursor.next())).map(a -> a.toString()).collect(Collectors.toList());
+
+ totalBytes += fields.size();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean getBoolean(int field) {
+ checkFieldType(field, BOOLEAN);
+ return Boolean.parseBoolean(getFieldValue(field));
+ }
+
+ @Override
+ public long getLong(int field) {
+ String timeStr = getFieldValue(field);
+ Long milliSec = 0L;
+
+ //suppose the
+ return Math.round(Double.parseDouble(getFieldValue(field)));
+ }
+
+ @Override
+ public double getDouble(int field) {
+ checkFieldType(field, DOUBLE);
+ return Double.parseDouble(getFieldValue(field));
+ }
+
+ @Override
+ public Slice getSlice(int field) {
+ checkFieldType(field, VARCHAR);
+ return Slices.utf8Slice(getFieldValue(field));
+ }
+
+ @Override
+ public Object getObject(int field) {
+ return null;
+ }
+
+ @Override
+ public boolean isNull(int field) {
+ checkArgument(field < columnHandles.size(), "Invalid field index");
+ return Strings.isNullOrEmpty(getFieldValue(field));
+ }
+
+ String getFieldValue(int field)
+ {
+ checkState(fields != null, "Cursor has not been advanced yet");
+ return fields.get(field);
+ }
+
+ private void checkFieldType(int field, Type expected)
+ {
+ Type actual = getType(field);
+ checkArgument(actual.equals(expected), "Expected field %s to be type %s but is %s", field, expected, actual);
+ }
+
+ @Override
+ public void close() {
+ nanoEnd = System.nanoTime();
+
+ //todo delete cache from readSupport
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordSet.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordSet.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordSet.java
new file mode 100755
index 0000000..622ef8a
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordSet.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.spi.*;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.Type;
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodedReadSupportImpl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.facebook.presto.carbondata.Types.checkType;
+
+public class CarbondataRecordSet implements RecordSet {
+
+ private CarbonTable carbonTable;
+ private TupleDomain<ColumnHandle> originalConstraint;
+ private Expression carbonConstraint;
+ private List<CarbondataColumnConstraint> rebuildConstraints;
+ private QueryModel queryModel;
+ private CarbondataSplit split;
+ private List<CarbondataColumnHandle> columns;
+ private QueryExecutor queryExecutor;
+
+ private CarbonReadSupport<Object[]> readSupport;
+
+ public CarbondataRecordSet(
+ CarbonTable carbonTable,
+ ConnectorSession session,
+ ConnectorSplit split,
+ List<CarbondataColumnHandle> columns,
+ QueryModel queryModel){
+ this.carbonTable = carbonTable;
+ this.split = checkType(split, CarbondataSplit.class, "connectorSplit");
+ this.originalConstraint = this.split.getConstraints();
+ this.rebuildConstraints = this.split.getRebuildConstraints();
+ this.queryModel = queryModel;
+ this.columns = columns;
+ this.readSupport = new DictionaryDecodedReadSupportImpl();
+ }
+
+ //todo support later
+ private Expression parseConstraint2Expression(TupleDomain<ColumnHandle> constraints) {
+ return null;
+ }
+
+ @Override
+ public List<Type> getColumnTypes() {
+ return columns.stream().map(a -> a.getColumnType()).collect(Collectors.toList());
+ }
+
+ @Override
+ public RecordCursor cursor() {
+ List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
+
+ //tableBlockInfoList.add(split.getLocalInputSplit().getTableBlockInfo());
+ /*BlockletInfos blockletInfos = new BlockletInfos(split.getLocalInputSplit().getNumberOfBlocklets(), 0,
+ split.getLocalInputSplit().getNumberOfBlocklets());*/
+ tableBlockInfoList.add(
+ new TableBlockInfo(split.getLocalInputSplit().getPath().toString(),
+ split.getLocalInputSplit().getStart(),
+ split.getLocalInputSplit().getSegmentId(),
+ split.getLocalInputSplit().getLocations().toArray(new String[0]),
+ split.getLocalInputSplit().getLength(),
+ //blockletInfos,
+ ColumnarFormatVersion.valueOf(split.getLocalInputSplit().getVersion())));
+ queryModel.setTableBlockInfos(tableBlockInfoList);
+
+ queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+
+ //queryModel.setQueryId(queryModel.getQueryId() + "_" + split.getLocalInputSplit().getSegmentId());
+ try {
+ readSupport.initialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier());
+ CarbonIterator<Object[]> carbonIterator = new ChunkRowIterator((CarbonIterator<BatchResult>) queryExecutor.execute(queryModel));
+ RecordCursor rc = new CarbondataRecordCursor(readSupport, carbonIterator, columns, split);
+ return rc;
+ } catch (QueryExecutionException e) {
+ //throw new InterruptedException(e.getMessage());
+ System.out.println(e.getMessage());
+ } catch(Exception ex) {
+ System.out.println(ex.toString());
+ }
+ return null;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordSetProvider.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordSetProvider.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordSetProvider.java
new file mode 100755
index 0000000..9e82b93
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataRecordSetProvider.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.carbondata.impl.CarbonTableCacheModel;
+import com.facebook.presto.carbondata.impl.CarbonTableReader;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.predicate.Domain;
+import com.facebook.presto.spi.predicate.Range;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.facebook.presto.spi.type.*;
+import com.google.common.collect.ImmutableList;
+import io.airlift.slice.Slice;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.*;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
+import org.apache.carbondata.core.scan.expression.logical.OrExpression;
+import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+
+import javax.inject.Inject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.facebook.presto.carbondata.Types.checkType;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataRecordSetProvider implements ConnectorRecordSetProvider {
+
+ private final String connectorId;
+ private final CarbonTableReader carbonTableReader;
+
+ @Inject
+ public CarbondataRecordSetProvider(
+ CarbondataConnectorId connectorId,
+ CarbonTableReader reader)
+ {
+ //this.config = requireNonNull(config, "config is null");
+ //this.connector = requireNonNull(connector, "connector is null");
+ this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
+ this.carbonTableReader = reader;
+ }
+
+ @Override
+ public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List<? extends ColumnHandle> columns) {
+ requireNonNull(split, "split is null");
+ requireNonNull(columns, "columns is null");
+
+ // Convert split
+ CarbondataSplit cdSplit = checkType(split, CarbondataSplit.class, "split is not class CarbondataSplit");
+ checkArgument(cdSplit.getConnectorId().equals(connectorId), "split is not for this connector");
+
+ // Convert all columns handles
+ ImmutableList.Builder<CarbondataColumnHandle> handles = ImmutableList.builder();
+ for (ColumnHandle handle : columns) {
+ handles.add(checkType(handle, CarbondataColumnHandle.class, "handle"));
+ }
+
+ // Build column projection(check the column order)
+ String targetCols = "";
+ for(ColumnHandle col : columns){
+ targetCols += ((CarbondataColumnHandle)col).getColumnName() + ",";
+ }
+ targetCols = targetCols.substring(0, targetCols.length() -1 );
+ //String cols = String.join(",", columns.stream().map(a -> ((CarbondataColumnHandle)a).getColumnName()).collect(Collectors.toList()));
+
+ CarbonTableCacheModel tableCacheModel = carbonTableReader.getCarbonCache(cdSplit.getSchemaTableName());
+ checkNotNull(tableCacheModel, "tableCacheModel should not be null");
+ checkNotNull(tableCacheModel.carbonTable, "tableCacheModel.carbonTable should not be null");
+ checkNotNull(tableCacheModel.tableInfo, "tableCacheModel.tableInfo should not be null");
+
+ // Build Query Model
+ CarbonTable targetTable = tableCacheModel.carbonTable;
+ CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(targetTable, targetCols);
+ QueryModel queryModel = QueryModel.createModel(targetTable.getAbsoluteTableIdentifier(), queryPlan, targetTable);
+
+ // Push down filter
+ fillFilter2QueryModel(queryModel, cdSplit.getConstraints(), targetTable);
+
+ // Return new record set
+ return new CarbondataRecordSet(targetTable,/*connector,*/ session, /*config, */cdSplit, handles.build(), queryModel);
+ }
+
+ // Build filter for QueryModel (copy from CarbonInputFormat=> createRecordReader)
+ private void fillFilter2QueryModel(QueryModel queryModel, TupleDomain<ColumnHandle> originalConstraint, CarbonTable carbonTable) {
+
+ //queryModel.setFilterExpressionResolverTree(new FilterResolverIntf());
+
+ //Build Predicate Expression
+ ImmutableList.Builder<Expression> filters = ImmutableList.builder();
+
+ Domain domain = null;
+
+ for (ColumnHandle c : originalConstraint.getDomains().get().keySet()) {
+
+ // Build ColumnExpresstion for Expresstion(Carbondata)
+ CarbondataColumnHandle cdch = (CarbondataColumnHandle) c;
+ Type type = cdch.getColumnType();
+
+ DataType coltype = Spi2CarbondataTypeMapper(type);
+ Expression colExpression = new ColumnExpression(cdch.getColumnName(), coltype);
+
+ domain = originalConstraint.getDomains().get().get(c);
+ checkArgument(domain.getType().isOrderable(), "Domain type must be orderable");
+
+ if (domain.getValues().isNone()) {
+ //return QueryBuilders.filteredQuery(null, FilterBuilders.missingFilter(columnName));
+ //return domain.isNullAllowed() ? columnName + " IS NULL" : "FALSE";
+ //new Expression()
+ }
+
+ if (domain.getValues().isAll()) {
+ //return QueryBuilders.filteredQuery(null, FilterBuilders.existsFilter(columnName));
+ //return domain.isNullAllowed() ? "TRUE" : columnName + " IS NOT NULL";
+ }
+
+ List<Object> singleValues = new ArrayList<>();
+ List<Expression> rangeFilter = new ArrayList<>();
+ for (Range range : domain.getValues().getRanges().getOrderedRanges()) {
+ checkState(!range.isAll()); // Already checked
+ if (range.isSingleValue()) {
+ singleValues.add(range.getLow().getValue());
+ }
+ else
+ {
+ List<String> rangeConjuncts = new ArrayList<>();
+ if (!range.getLow().isLowerUnbounded()) {
+ Object value = ConvertDataByType(range.getLow().getValue(), type);
+ switch (range.getLow().getBound()) {
+ case ABOVE:
+ if (type == TimestampType.TIMESTAMP) {
+ //todo not now
+ } else {
+ GreaterThanExpression greater = new GreaterThanExpression(colExpression, new LiteralExpression(value, coltype));
+ //greater.setRangeExpression(true);
+ rangeFilter.add(greater);
+ }
+ break;
+ case EXACTLY:
+ GreaterThanEqualToExpression greater = new GreaterThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
+ //greater.setRangeExpression(true);
+ rangeFilter.add(greater);
+ break;
+ case BELOW:
+ throw new IllegalArgumentException("Low marker should never use BELOW bound");
+ default:
+ throw new AssertionError("Unhandled bound: " + range.getLow().getBound());
+ }
+ }
+ if (!range.getHigh().isUpperUnbounded()) {
+ Object value = ConvertDataByType(range.getHigh().getValue(), type);
+ switch (range.getHigh().getBound()) {
+ case ABOVE:
+ throw new IllegalArgumentException("High marker should never use ABOVE bound");
+ case EXACTLY:
+ LessThanEqualToExpression less = new LessThanEqualToExpression(colExpression, new LiteralExpression(value, coltype));
+ //less.setRangeExpression(true);
+ rangeFilter.add(less);
+ break;
+ case BELOW:
+ LessThanExpression less2 = new LessThanExpression(colExpression, new LiteralExpression(value, coltype));
+ //less2.setRangeExpression(true);
+ rangeFilter.add(less2);
+ break;
+ default:
+ throw new AssertionError("Unhandled bound: " + range.getHigh().getBound());
+ }
+ }
+ }
+ }
+
+ if (singleValues.size() == 1) {
+ Expression ex = null;
+ if (coltype.equals(DataType.STRING)) {
+ ex = new EqualToExpression(colExpression, new LiteralExpression(((Slice) singleValues.get(0)).toStringUtf8(), coltype));
+ } else
+ ex = new EqualToExpression(colExpression, new LiteralExpression(singleValues.get(0), coltype));
+ filters.add(ex);
+ }
+ else if(singleValues.size() > 1) {
+ ListExpression candidates = null;
+ List<Expression> exs = singleValues.stream().map((a) ->
+ {
+ return new LiteralExpression(ConvertDataByType(a, type), coltype);
+ }).collect(Collectors.toList());
+ candidates = new ListExpression(exs);
+
+ if(candidates != null)
+ filters.add(new InExpression(colExpression, candidates));
+ }
+ else if(rangeFilter.size() > 0){
+ if(rangeFilter.size() > 1) {
+ Expression finalFilters = new OrExpression(rangeFilter.get(0), rangeFilter.get(1));
+ if(rangeFilter.size() > 2)
+ {
+ for(int i = 2; i< rangeFilter.size(); i++)
+ {
+ filters.add(new AndExpression(finalFilters, rangeFilter.get(i)));
+ }
+ }
+ }
+ else if(rangeFilter.size() == 1)
+ filters.add(rangeFilter.get(0));
+ }
+ }
+
+ Expression finalFilters;
+ List<Expression> tmp = filters.build();
+ if(tmp.size() > 1) {
+ finalFilters = new AndExpression(tmp.get(0), tmp.get(1));
+ if(tmp.size() > 2)
+ {
+ for(int i = 2; i< tmp.size(); i++)
+ {
+ finalFilters = new AndExpression(finalFilters, tmp.get(i));
+ }
+ }
+ }
+ else if(tmp.size() == 1)
+ finalFilters = tmp.get(0);
+ else
+ return;
+
+ // todo set into QueryModel
+ CarbonInputFormatUtil.processFilterExpression(finalFilters, carbonTable);
+ queryModel.setFilterExpressionResolverTree(CarbonInputFormatUtil.resolveFilter(finalFilters, queryModel.getAbsoluteTableIdentifier()));
+ }
+
+ public static DataType Spi2CarbondataTypeMapper(Type colType)
+ {
+ if(colType == BooleanType.BOOLEAN)
+ return DataType.BOOLEAN;
+ else if(colType == SmallintType.SMALLINT)
+ return DataType.SHORT;
+ else if(colType == IntegerType.INTEGER)
+ return DataType.INT;
+ else if(colType == BigintType.BIGINT)
+ return DataType.LONG;
+ else if(colType == DoubleType.DOUBLE)
+ return DataType.DOUBLE;
+ else if(colType == DecimalType.createDecimalType())
+ return DataType.DECIMAL;
+ else if(colType == VarcharType.VARCHAR)
+ return DataType.STRING;
+ else if(colType == DateType.DATE)
+ return DataType.DATE;
+ else if(colType == TimestampType.TIMESTAMP)
+ return DataType.TIMESTAMP;
+ else
+ return DataType.STRING;
+ }
+
+
+ public Object ConvertDataByType(Object rawdata, Type type)
+ {
+ if(type.equals(IntegerType.INTEGER))
+ return new Integer((rawdata.toString()));
+ else if(type.equals(BigintType.BIGINT))
+ return (Long)rawdata;
+ else if(type.equals(VarcharType.VARCHAR))
+ return ((Slice)rawdata).toStringUtf8();
+ else if(type.equals(BooleanType.BOOLEAN))
+ return (Boolean)(rawdata);
+
+ return rawdata;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/56720871/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplit.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplit.java b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplit.java
new file mode 100755
index 0000000..eb3db1e
--- /dev/null
+++ b/integration/presto/src/main/java/com/facebook/presto/carbondata/CarbondataSplit.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.carbondata;
+
+import com.facebook.presto.carbondata.impl.CarbonLocalInputSplit;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.HostAddress;
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.predicate.TupleDomain;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+public class CarbondataSplit implements ConnectorSplit {
+
+ private final String connectorId;
+ private final SchemaTableName schemaTableName;
+ private final TupleDomain<ColumnHandle> constraints;
+ private final CarbonLocalInputSplit localInputSplit;
+ private final List<CarbondataColumnConstraint> rebuildConstraints;
+ private final ImmutableList<HostAddress> addresses;
+
+ @JsonCreator
+ public CarbondataSplit( @JsonProperty("connectorId") String connectorId,
+ @JsonProperty("schemaTableName") SchemaTableName schemaTableName,
+ @JsonProperty("constraints") TupleDomain<ColumnHandle> constraints,
+ @JsonProperty("localInputSplit") CarbonLocalInputSplit localInputSplit,
+ @JsonProperty("rebuildConstraints") List<CarbondataColumnConstraint> rebuildConstraints) {
+ this.connectorId = requireNonNull(connectorId, "connectorId is null");
+ this.schemaTableName = requireNonNull(schemaTableName, "schemaTable is null");
+ this.constraints = requireNonNull(constraints, "constraints is null");
+ this.localInputSplit = requireNonNull(localInputSplit, "localInputSplit is null");
+ this.rebuildConstraints = requireNonNull(rebuildConstraints, "rebuildConstraints is null");
+ this.addresses = ImmutableList.of();
+ }
+
+
+ @JsonProperty
+ public String getConnectorId() {
+ return connectorId;
+ }
+
+ @JsonProperty
+ public SchemaTableName getSchemaTableName(){
+ return schemaTableName;
+ }
+
+ @JsonProperty
+ public TupleDomain<ColumnHandle> getConstraints() {
+ return constraints;
+ }
+
+ @JsonProperty
+ public CarbonLocalInputSplit getLocalInputSplit(){return localInputSplit;}
+
+ @JsonProperty
+ public List<CarbondataColumnConstraint> getRebuildConstraints() {
+ return rebuildConstraints;
+ }
+
+ @Override
+ public boolean isRemotelyAccessible() {
+ return true;
+ }
+
+ @Override
+ public List<HostAddress> getAddresses() {
+ return addresses;
+ }
+
+ @Override
+ public Object getInfo() {
+ return this;
+ }
+}
+
[3/3] incubator-carbondata git commit: [CARBONDATA-728] Add
intergation with presto This closes #650
Posted by ch...@apache.org.
[CARBONDATA-728] Add intergation with presto This closes #650
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/9d7dbea3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/9d7dbea3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/9d7dbea3
Branch: refs/heads/master
Commit: 9d7dbea38496b8cb9b677eb6a04236396870ebe8
Parents: e441ab0 5672087
Author: chenliang613 <ch...@huawei.com>
Authored: Thu Mar 23 10:42:59 2017 +0530
Committer: chenliang613 <ch...@huawei.com>
Committed: Thu Mar 23 10:42:59 2017 +0530
----------------------------------------------------------------------
integration/presto/pom.xml | 167 +++++
integration/presto/src/checkstyle/checks.xml | 7 +
.../presto/src/license/LICENSE-HEADER.txt | 11 +
.../carbondata/CarbondataColumnConstraint.java | 99 +++
.../carbondata/CarbondataColumnHandle.java | 159 ++++
.../presto/carbondata/CarbondataConnector.java | 84 +++
.../carbondata/CarbondataConnectorFactory.java | 95 +++
.../carbondata/CarbondataConnectorId.java | 57 ++
.../carbondata/CarbondataHandleResolver.java | 45 ++
.../presto/carbondata/CarbondataMetadata.java | 305 ++++++++
.../presto/carbondata/CarbondataModule.java | 79 ++
.../presto/carbondata/CarbondataPlugin.java | 32 +
.../carbondata/CarbondataRecordCursor.java | 152 ++++
.../presto/carbondata/CarbondataRecordSet.java | 111 +++
.../carbondata/CarbondataRecordSetProvider.java | 292 ++++++++
.../presto/carbondata/CarbondataSplit.java | 92 +++
.../carbondata/CarbondataSplitManager.java | 300 ++++++++
.../carbondata/CarbondataTableHandle.java | 80 ++
.../carbondata/CarbondataTableLayoutHandle.java | 83 +++
.../carbondata/CarbondataTransactionHandle.java | 22 +
.../com/facebook/presto/carbondata/Types.java | 34 +
.../carbondata/impl/CarbonLocalInputSplit.java | 89 +++
.../carbondata/impl/CarbonTableCacheModel.java | 40 +
.../carbondata/impl/CarbonTableConfig.java | 58 ++
.../carbondata/impl/CarbonTableReader.java | 736 +++++++++++++++++++
25 files changed, 3229 insertions(+)
----------------------------------------------------------------------